Skip to content

Commit bad6931

Browse files
gedemgedem
authored andcommitted
Optional compression, dictionary properties, GZip helper
1 parent 5868947 commit bad6931

17 files changed

+252
-72
lines changed

src/NetCoreStack.WebSockets/ConnectionManager.cs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
using Newtonsoft.Json;
66
using System;
77
using System.Collections.Concurrent;
8+
using System.Collections.Generic;
89
using System.IO;
910
using System.Linq;
1011
using System.Net.WebSockets;
1112
using System.Text;
1213
using System.Threading;
1314
using System.Threading.Tasks;
15+
using static NetCoreStack.WebSockets.Internal.SocketsConstants;
1416

1517
namespace NetCoreStack.WebSockets
1618
{
@@ -37,20 +39,32 @@ public ConnectionManager(IStreamCompressor compressor,
3739
Connections = new ConcurrentDictionary<string, WebSocketTransport>();
3840
}
3941

40-
private async Task<byte[]> PrepareBytesAsync(byte[] input, JsonObject properties)
42+
private async Task<byte[]> PrepareBytesAsync(byte[] input, IDictionary<string, object> properties, bool compression = true)
4143
{
4244
if (input == null)
4345
{
4446
throw new ArgumentNullException(nameof(input));
4547
}
4648

49+
if (properties == null)
50+
properties = new Dictionary<string, object>();
51+
52+
object compressedKey = null;
53+
if (properties.TryGetValue(CompressedKey, out compressedKey))
54+
properties[CompressedKey] = compression;
55+
else
56+
properties.Add(CompressedKey, compression);
57+
4758
var props = JsonConvert.SerializeObject(properties);
48-
var propsBytes = Encoding.UTF8.GetBytes($"{props}{SocketsConstants.Splitter}");
59+
var propsBytes = Encoding.UTF8.GetBytes($"{props}{Splitter}");
4960

5061
var bytesCount = input.Length;
5162
input = propsBytes.Concat(input).ToArray();
5263

53-
return await _compressor.CompressAsync(input);
64+
if (compression)
65+
return await _compressor.CompressAsync(input);
66+
67+
return input;
5468
}
5569

5670
private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor)
@@ -142,15 +156,14 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
142156
}
143157
}
144158

145-
public async Task BroadcastBinaryAsync(byte[] inputs, JsonObject properties)
159+
public async Task BroadcastBinaryAsync(byte[] inputs, IDictionary<string, object> properties, bool compression = true)
146160
{
147161
if (!Connections.Any())
148162
{
149163
return;
150164
}
151165

152-
var bytes = await PrepareBytesAsync(inputs, properties);
153-
var buffer = new byte[SocketsConstants.ChunkSize];
166+
var bytes = await PrepareBytesAsync(inputs, properties, compression);
154167

155168
using (var ms = new MemoryStream(bytes))
156169
{
@@ -159,10 +172,10 @@ public async Task BroadcastBinaryAsync(byte[] inputs, JsonObject properties)
159172
byte[] chunkedBytes = null;
160173
do
161174
{
162-
chunkedBytes = br.ReadBytes(SocketsConstants.ChunkSize);
175+
chunkedBytes = br.ReadBytes(ChunkSize);
163176
var endOfMessage = false;
164177

165-
if (chunkedBytes.Length < SocketsConstants.ChunkSize)
178+
if (chunkedBytes.Length < ChunkSize)
166179
endOfMessage = true;
167180

168181
foreach (var connection in Connections)
@@ -173,9 +186,9 @@ public async Task BroadcastBinaryAsync(byte[] inputs, JsonObject properties)
173186
if (endOfMessage)
174187
break;
175188

176-
} while (chunkedBytes.Length <= SocketsConstants.ChunkSize);
189+
} while (chunkedBytes.Length <= ChunkSize);
177190
}
178-
}
191+
}
179192
}
180193

181194
public async Task SendAsync(string connectionId, WebSocketMessageContext context)
@@ -202,7 +215,7 @@ public async Task SendAsync(string connectionId, WebSocketMessageContext context
202215
await SendAsync(transport, descriptor);
203216
}
204217

205-
public async Task SendBinaryAsync(string connectionId, byte[] input, JsonObject properties)
218+
public async Task SendBinaryAsync(string connectionId, byte[] input, IDictionary<string, object> properties, bool compression = true)
206219
{
207220
if (!Connections.Any())
208221
{
@@ -215,21 +228,20 @@ public async Task SendBinaryAsync(string connectionId, byte[] input, JsonObject
215228
throw new ArgumentOutOfRangeException(nameof(transport));
216229
}
217230

218-
byte[] bytes = await PrepareBytesAsync(input, properties);
231+
byte[] bytes = await PrepareBytesAsync(input, properties, compression);
219232

220-
var buffer = new byte[SocketsConstants.ChunkSize];
221233
using (var ms = new MemoryStream(bytes))
222234
{
223235
using (BinaryReader br = new BinaryReader(ms))
224236
{
225237
byte[] chunkBytes = null;
226238
do
227239
{
228-
chunkBytes = br.ReadBytes(SocketsConstants.ChunkSize);
240+
chunkBytes = br.ReadBytes(ChunkSize);
229241
var segments = new ArraySegment<byte>(chunkBytes);
230242
var endOfMessage = false;
231243

232-
if (chunkBytes.Length < SocketsConstants.ChunkSize)
244+
if (chunkBytes.Length < ChunkSize)
233245
endOfMessage = true;
234246

235247
await transport.WebSocket.SendAsync(segments,
@@ -240,7 +252,7 @@ await transport.WebSocket.SendAsync(segments,
240252
if (endOfMessage)
241253
break;
242254

243-
} while (chunkBytes.Length <= SocketsConstants.ChunkSize);
255+
} while (chunkBytes.Length <= ChunkSize);
244256
}
245257
}
246258
}
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
using System.Threading.Tasks;
1+
using System.Collections;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
24

35
namespace NetCoreStack.WebSockets
46
{
57
public class DefaultHandshakeStateTransport : IHandshakeStateTransport
68
{
7-
public Task<object> GetStateAsync()
9+
public Task<IDictionary<string, object>> GetStateAsync()
810
{
9-
return Task.FromResult<object>(0);
11+
var dictionary = new Dictionary<string, object>();
12+
return Task.FromResult<IDictionary<string, object>>(dictionary);
1013
}
1114
}
1215
}

src/NetCoreStack.WebSockets/Extensions/WebSocketExtensions.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using NetCoreStack.WebSockets.Internal;
33
using Newtonsoft.Json;
44
using System;
5+
using System.Collections.Generic;
56
using System.IO;
67
using System.Linq;
78
using System.Net.WebSockets;
@@ -44,10 +45,20 @@ public static async Task<WebSocketMessageContext> ToBinaryContextAsync(this WebS
4445
{
4546
throw new ArgumentNullException(nameof(result));
4647
}
47-
48-
var webSocketContext = new WebSocketMessageContext();
49-
var decompressedBytes = await compressor.DeCompressAsync(input);
50-
using (var ms = new MemoryStream(decompressedBytes))
48+
49+
var webSocketContext = new WebSocketMessageContext();
50+
bool compression = GZipHelper.IsGZipHeader(input);
51+
byte[] bytes = null;
52+
if (compression)
53+
{
54+
bytes = await compressor.DeCompressAsync(input);
55+
}
56+
else
57+
{
58+
bytes = input;
59+
}
60+
61+
using (var ms = new MemoryStream(bytes))
5162
using (var sr = new StreamReader(ms))
5263
{
5364
var content = await sr.ReadToEndAsync();
@@ -64,11 +75,15 @@ public static async Task<WebSocketMessageContext> ToBinaryContextAsync(this WebS
6475

6576
try
6677
{
67-
webSocketContext.State = JsonConvert.DeserializeObject<SocketObject>(parts.First());
78+
webSocketContext.State = JsonConvert.DeserializeObject<Dictionary<string, object>>(parts.First());
6879
}
69-
catch (Exception)
80+
catch (Exception ex)
7081
{
71-
webSocketContext.State = "Unknown";
82+
webSocketContext.State = new Dictionary<string, object>
83+
{
84+
["Exception"] = ex.Message,
85+
["Unknown"] = "Unknown binary message"
86+
};
7287
}
7388
}
7489

src/NetCoreStack.WebSockets/Interfaces/IConnectionManager.cs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using NetCoreStack.WebSockets.Internal;
22
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
34
using System.Net.WebSockets;
45
using System.Threading.Tasks;
56

@@ -11,14 +12,44 @@ public interface IConnectionManager
1112

1213
Task ConnectAsync(WebSocket webSocket);
1314

15+
/// <summary>
16+
/// Text message broadcaster
17+
/// </summary>
18+
/// <param name="context">Data</param>
19+
/// <returns></returns>
1420
Task BroadcastAsync(WebSocketMessageContext context);
1521

16-
Task BroadcastBinaryAsync(byte[] input, JsonObject properties);
22+
/// <summary>
23+
/// Binary message broadcaster
24+
/// </summary>
25+
/// <param name="inputs"></param>
26+
/// <param name="properties"></param>
27+
/// <param name="compression"></param>
28+
/// <returns></returns>
29+
Task BroadcastBinaryAsync(byte[] input, IDictionary<string, object> properties, bool compression = true);
1730

31+
/// <summary>
32+
/// Send text message to specified connection
33+
/// </summary>
34+
/// <param name="connectionId"></param>
35+
/// <param name="context"></param>
36+
/// <returns></returns>
1837
Task SendAsync(string connectionId, WebSocketMessageContext context);
1938

20-
Task SendBinaryAsync(string connectionId, byte[] input, JsonObject properties);
39+
/// <summary>
40+
/// Send binary message to specified connection
41+
/// </summary>
42+
/// <param name="connectionId"></param>
43+
/// <param name="input">Data</param>
44+
/// <param name="properties">Additional transport data</param>
45+
/// <param name="compression">Compression status of the data, default value is true</param>
46+
/// <returns></returns>
47+
Task SendBinaryAsync(string connectionId, byte[] input, IDictionary<string, object> properties, bool compression = true);
2148

49+
/// <summary>
50+
/// Close the specified connection
51+
/// </summary>
52+
/// <param name="connectionId"></param>
2253
void CloseConnection(string connectionId);
2354
}
2455
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
using System.Threading.Tasks;
1+
using System.Collections.Generic;
2+
using System.Threading.Tasks;
23

34
namespace NetCoreStack.WebSockets
45
{
56
public interface IHandshakeStateTransport
67
{
7-
Task<object> GetStateAsync();
8+
Task<IDictionary<string, object>> GetStateAsync();
89
}
910
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace NetCoreStack.WebSockets.Internal
2+
{
3+
public static class GZipHelper
4+
{
5+
/// <summary>
6+
/// Checks the first two bytes in a GZIP file, which must be 31 and 139.
7+
/// </summary>
8+
public static bool IsGZipHeader(byte[] arr)
9+
{
10+
return arr.Length >= 2 &&
11+
arr[0] == 31 &&
12+
arr[1] == 139;
13+
}
14+
}
15+
}

src/NetCoreStack.WebSockets/Internal/SocketsConstants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
public class SocketsConstants
44
{
55
public const string Splitter = "<|>";
6+
public const string CompressedKey = "Compressed";
67
public const int ChunkSize = 1024 * 4;
78
}
89
}

src/NetCoreStack.WebSockets/Internal/WebSocketReceiver.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,22 @@ private async Task InternalReceiveAsync()
6161
}
6262
binaryResult = ms.ToArray();
6363
}
64-
var context = await result.ToBinaryContextAsync(_context.Compressor, binaryResult);
65-
var _invocators = _context.InvocatorRegistry.GetInvocators(context, _context.Options);
66-
foreach (var invoker in _invocators)
64+
try
6765
{
68-
await invoker.InvokeAsync(context);
66+
var context = await result.ToBinaryContextAsync(_context.Compressor, binaryResult);
67+
var _invocators = _context.InvocatorRegistry.GetInvocators(context, _context.Options);
68+
foreach (var invoker in _invocators)
69+
{
70+
await invoker.InvokeAsync(context);
71+
}
72+
}
73+
catch (Exception ex)
74+
{
75+
var logger = _context.LoggerFactory.CreateLogger<WebSocketReceiver>();
76+
logger.LogDebug(new EventId((int)WebSocketState.Aborted,
77+
nameof(WebSocketState.Aborted)),
78+
ex, "WebSocket transport exception!",
79+
_context.Options);
6980
}
7081
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
7182
}

src/NetCoreStack.WebSockets/NetCoreStack.WebSockets.xproj

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@
1111
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
1212
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
1313
<TargetFrameworkVersion>v4.6</TargetFrameworkVersion>
14+
<SccProjectName>
15+
</SccProjectName>
16+
<SccProvider>
17+
</SccProvider>
18+
<SccAuxPath>
19+
</SccAuxPath>
20+
<SccLocalPath>
21+
</SccLocalPath>
1422
</PropertyGroup>
1523
<PropertyGroup>
1624
<SchemaVersion>2.0</SchemaVersion>

src/NetCoreStack.WebSockets/Types/SocketObject.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)