Skip to content

Commit 4923761

Browse files
Gencebay DemirGencebay Demir
authored andcommitted
WebSocket Binary Message Support (Chunked Stream) Added
1 parent a446646 commit 4923761

File tree

15 files changed

+238
-27
lines changed

15 files changed

+238
-27
lines changed

src/NetCoreStack.WebSockets.ProxyClient/ApplicationBuilderExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static IApplicationBuilder UseProxyWebSockets(this IApplicationBuilder ap
1919
if (webSocketConnector != null && appLifeTime != null)
2020
{
2121
appLifeTime.ApplicationStopping.Register(OnShutdown, webSocketConnector);
22-
Task.Run(async () => await webSocketConnector.InitializeAsync());
22+
Task.Run(async () => await webSocketConnector.ConnectAsync());
2323
}
2424

2525
return app;

src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketConnector.cs

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using Microsoft.Extensions.Options;
22
using System;
3+
using System.IO;
4+
using System.Linq;
35
using System.Net.WebSockets;
46
using System.Threading;
57
using System.Threading.Tasks;
@@ -13,38 +15,82 @@ internal class ClientWebSocketConnector : IWebSocketConnector
1315
private readonly ConnectorOptions _options;
1416
private readonly InvocatorRegistry _invocatorRegistry;
1517

16-
public ClientWebSocketConnector(IOptions<ConnectorOptions> options, InvocatorRegistry invocatorRegistry)
18+
public ClientWebSocketConnector(IOptions<ConnectorOptions> options,
19+
InvocatorRegistry invocatorRegistry)
1720
{
1821
_options = options.Value;
1922
_invocatorRegistry = invocatorRegistry;
2023
}
2124

22-
public async Task InitializeAsync()
25+
private async Task Receive()
2326
{
24-
var uri = new Uri($"ws://{_options.WebSocketHostAddress}");
25-
_webSocket = new ClientWebSocket();
26-
await _webSocket.ConnectAsync(uri, CancellationToken.None);
27-
var buffer = new byte[1024 * 4];
27+
var buffer = new byte[SocketsConstants.ChunkSize];
2828
var result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
2929
while (!result.CloseStatus.HasValue)
3030
{
31-
var context = result.ToContext(buffer);
32-
if (context.Command == WebSocketCommands.Handshake)
33-
_connectionId = context.Value.ToString();
31+
if (result.MessageType == WebSocketMessageType.Text)
32+
{
33+
var context = result.ToContext(buffer);
34+
if (context.Command == WebSocketCommands.Handshake)
35+
_connectionId = context.Value.ToString();
36+
37+
var _invocators = _invocatorRegistry.GetInvocators(context);
38+
_invocators.ForEach(async x => await x.InvokeAsync(context));
39+
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
40+
}
3441

35-
var _invocators = _invocatorRegistry.GetInvocators(context);
36-
_invocators.ForEach(async x => await x.InvokeAsync(context));
37-
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
42+
if (result.MessageType == WebSocketMessageType.Binary)
43+
{
44+
byte[] binaryResult = null;
45+
using (var ms = new MemoryStream())
46+
{
47+
while (!result.EndOfMessage)
48+
{
49+
if (!result.CloseStatus.HasValue)
50+
{
51+
await ms.WriteAsync(buffer, 0, result.Count);
52+
}
53+
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
54+
}
55+
if (result.EndOfMessage)
56+
{
57+
if (!result.CloseStatus.HasValue)
58+
{
59+
await ms.WriteAsync(buffer, 0, result.Count);
60+
}
61+
}
62+
binaryResult = ms.ToArray();
63+
}
64+
var context = result.ToBinaryContext(binaryResult);
65+
var _invocators = _invocatorRegistry.GetInvocators(context);
66+
_invocators.ForEach(async x => await x.InvokeAsync(context));
67+
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
68+
}
3869
}
3970
await _webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
4071
}
4172

73+
public async Task ConnectAsync()
74+
{
75+
var uri = new Uri($"ws://{_options.WebSocketHostAddress}");
76+
_webSocket = new ClientWebSocket();
77+
await _webSocket.ConnectAsync(uri, CancellationToken.None);
78+
await Task.WhenAll(Receive());
79+
}
80+
4281
public async Task SendAsync(WebSocketMessageContext context)
4382
{
4483
var segments = context.ToSegment();
4584
await _webSocket.SendAsync(segments, WebSocketMessageType.Text, true, CancellationToken.None);
4685
}
4786

87+
public async Task SendBinaryAsync(byte[] bytes)
88+
{
89+
// TODO Chunked
90+
var segments = new ArraySegment<byte>(bytes, 0, bytes.Count());
91+
await _webSocket.SendAsync(segments, WebSocketMessageType.Binary, true, CancellationToken.None);
92+
}
93+
4894
internal void Close(string statusDescription)
4995
{
5096
_webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, statusDescription, CancellationToken.None);

src/NetCoreStack.WebSockets.ProxyClient/IWebSocketConnector.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ namespace NetCoreStack.WebSockets.ProxyClient
44
{
55
public interface IWebSocketConnector
66
{
7-
Task InitializeAsync();
7+
Task ConnectAsync();
88
Task SendAsync(WebSocketMessageContext context);
9+
Task SendBinaryAsync(byte[] bytes);
910
}
1011
}

src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
using Microsoft.Extensions.Logging;
2+
using Newtonsoft.Json;
23
using System;
34
using System.Collections.Concurrent;
5+
using System.IO;
6+
using System.Linq;
47
using System.Net.WebSockets;
8+
using System.Text;
59
using System.Threading;
610
using System.Threading.Tasks;
711

@@ -81,6 +85,48 @@ public async Task SendAsync(string connectionId, WebSocketMessageContext context
8185
await SendAsync(transport, descriptor);
8286
}
8387

88+
public async Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject properties)
89+
{
90+
WebSocketTransport transport = null;
91+
if (!Connections.TryGetValue(connectionId, out transport))
92+
{
93+
throw new ArgumentOutOfRangeException(nameof(transport));
94+
}
95+
96+
var props = JsonConvert.SerializeObject(properties);
97+
var propsBytes = Encoding.UTF8.GetBytes($"{SocketsConstants.Splitter}{props}");
98+
99+
var bytesCount = bytes.Length;
100+
bytes = bytes.Concat(propsBytes).ToArray();
101+
102+
var buffer = new byte[SocketsConstants.ChunkSize];
103+
using (var ms = new MemoryStream(bytes))
104+
{
105+
using (BinaryReader br = new BinaryReader(ms))
106+
{
107+
byte[] chunkBytes = null;
108+
do
109+
{
110+
chunkBytes = br.ReadBytes(SocketsConstants.ChunkSize);
111+
var segments = new ArraySegment<byte>(chunkBytes);
112+
var endOfMessage = false;
113+
114+
if (chunkBytes.Length < SocketsConstants.ChunkSize)
115+
endOfMessage = true;
116+
117+
await transport.WebSocket.SendAsync(segments,
118+
WebSocketMessageType.Binary,
119+
endOfMessage,
120+
CancellationToken.None);
121+
122+
if (endOfMessage)
123+
break;
124+
125+
} while (chunkBytes.Length <= SocketsConstants.ChunkSize);
126+
}
127+
}
128+
}
129+
84130
public async Task SendAsync(string connectionId, WebSocketMessageContext context, WebSocket webSocket)
85131
{
86132
if (string.IsNullOrEmpty(connectionId))

src/NetCoreStack.WebSockets/Internal/IConnectionManager.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using NetCoreStack.WebSockets;
2-
using System.Net.WebSockets;
1+
using System.Net.WebSockets;
32
using System.Threading.Tasks;
43

54
namespace NetCoreStack.WebSockets.Internal
@@ -10,6 +9,8 @@ public interface IConnectionManager
109

1110
Task SendAsync(string connectionId, WebSocketMessageContext context);
1211

12+
Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject properties);
13+
1314
Task SendAsync(string connectionId, WebSocketMessageContext context, WebSocket webSocket);
1415
void CloseConnection(string connectionId);
1516
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace NetCoreStack.WebSockets
2+
{
3+
public class SocketsConstants
4+
{
5+
public const string Splitter = "<<";
6+
public const int ChunkSize = 1024 * 4;
7+
}
8+
}

src/NetCoreStack.WebSockets/NetCoreStack.WebSockets.xproj

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,16 @@
44
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
55
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
66
</PropertyGroup>
7-
87
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
98
<PropertyGroup Label="Globals">
109
<ProjectGuid>c6bd4266-256a-4ff6-b991-d44efc825fc6</ProjectGuid>
11-
<RootNamespace>NetCoreStack.WebSockets.Common</RootNamespace>
10+
<RootNamespace>NetCoreStack.WebSockets</RootNamespace>
1211
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
1312
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
1413
<TargetFrameworkVersion>v4.6</TargetFrameworkVersion>
1514
</PropertyGroup>
16-
1715
<PropertyGroup>
1816
<SchemaVersion>2.0</SchemaVersion>
1917
</PropertyGroup>
2018
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
21-
</Project>
19+
</Project>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System.Collections.Generic;
2+
3+
namespace NetCoreStack.WebSockets
4+
{
5+
public abstract class JsonObject
6+
{
7+
public JsonObject()
8+
{
9+
}
10+
11+
public IDictionary<string, object> ToJson()
12+
{
13+
Dictionary<string, object> dictionary = new Dictionary<string, object>();
14+
Serialize(dictionary);
15+
return dictionary;
16+
}
17+
18+
protected abstract void Serialize(IDictionary<string, object> value);
19+
}
20+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace NetCoreStack.WebSockets
5+
{
6+
public class SocketObject : JsonObject
7+
{
8+
public string Key { get; set; }
9+
10+
public object Value { get; set; }
11+
12+
protected override void Serialize(IDictionary<string, object> value)
13+
{
14+
value.Add(nameof(Key), Key);
15+
value.Add(nameof(Value), Value);
16+
}
17+
}
18+
}

src/NetCoreStack.WebSockets/WebSocketExtensions.cs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,51 @@
11
using Newtonsoft.Json;
22
using System;
3+
using System.Linq;
34
using System.Net.WebSockets;
45
using System.Text;
56

67
namespace NetCoreStack.WebSockets
78
{
89
public static class WebSocketExtensions
910
{
10-
public static WebSocketMessageContext ToContext(this WebSocketReceiveResult result, byte[] buffer)
11+
public static WebSocketMessageContext ToContext(this WebSocketReceiveResult result, byte[] values)
1112
{
1213
if (result == null)
1314
{
1415
throw new ArgumentNullException(nameof(result));
1516
}
1617

17-
var content = Encoding.UTF8.GetString(buffer, 0, result.Count);
18+
var content = Encoding.UTF8.GetString(values, 0, result.Count);
1819
var webSocketContext = JsonConvert.DeserializeObject<WebSocketMessageContext>(content);
1920
return webSocketContext;
2021
}
2122

23+
public static WebSocketMessageContext ToBinaryContext(this WebSocketReceiveResult result, byte[] values)
24+
{
25+
if (result == null)
26+
{
27+
throw new ArgumentNullException(nameof(result));
28+
}
29+
30+
var webSocketContext = new WebSocketMessageContext();
31+
var content = Encoding.UTF8.GetString(values);
32+
if (content != null)
33+
{
34+
string[] parts = content.Split(new string[] { SocketsConstants.Splitter }, StringSplitOptions.None);
35+
if (parts.Length != 2)
36+
{
37+
throw new InvalidOperationException(nameof(parts));
38+
}
39+
40+
webSocketContext.Value = parts.First();
41+
webSocketContext.State = JsonConvert.DeserializeObject<SocketObject>(parts.Last());
42+
}
43+
44+
webSocketContext.MessageType = WebSocketMessageType.Binary;
45+
webSocketContext.Command = WebSocketCommands.DataSend;
46+
return webSocketContext;
47+
}
48+
2249
public static ArraySegment<byte> ToSegment(this WebSocketMessageContext webSocketContext)
2350
{
2451
if (webSocketContext == null)

0 commit comments

Comments
 (0)