Skip to content

Commit e0b375a

Browse files
authored
Merge pull request #4 from gencebay/dev
BroadCast Binary Message to All Peers
2 parents 4923761 + 663b3bb commit e0b375a

File tree

5 files changed

+79
-12
lines changed

5 files changed

+79
-12
lines changed

src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,20 @@ public ConnectionManager(ILoggerFactory loggerFactory)
2222
Connections = new ConcurrentDictionary<string, WebSocketTransport>();
2323
}
2424

25+
private void PrepareBytes(ref byte[] bytes, JsonObject properties)
26+
{
27+
if (bytes == null)
28+
{
29+
throw new ArgumentNullException(nameof(bytes));
30+
}
31+
32+
var props = JsonConvert.SerializeObject(properties);
33+
var propsBytes = Encoding.UTF8.GetBytes($"{SocketsConstants.Splitter}{props}");
34+
35+
var bytesCount = bytes.Length;
36+
bytes = bytes.Concat(propsBytes).ToArray();
37+
}
38+
2539
private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor)
2640
{
2741
if (descriptor == null)
@@ -40,6 +54,21 @@ await transport.WebSocket.SendAsync(descriptor.Segments,
4054
CancellationToken.None);
4155
}
4256

57+
private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage)
58+
{
59+
if (transport == null)
60+
{
61+
throw new ArgumentNullException(nameof(transport));
62+
}
63+
64+
var segments = new ArraySegment<byte>(chunkedBytes);
65+
66+
await transport.WebSocket.SendAsync(segments,
67+
WebSocketMessageType.Binary,
68+
endOfMessage,
69+
CancellationToken.None);
70+
}
71+
4372
public async Task BroadcastAsync(WebSocketMessageContext context)
4473
{
4574
if (context == null)
@@ -66,6 +95,37 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
6695
}
6796
}
6897

98+
public async Task BroadcastBinaryAsync(byte[] bytes, JsonObject properties)
99+
{
100+
PrepareBytes(ref bytes, properties);
101+
102+
var buffer = new byte[SocketsConstants.ChunkSize];
103+
using (var ms = new MemoryStream(bytes))
104+
{
105+
using (BinaryReader br = new BinaryReader(ms))
106+
{
107+
byte[] chunkedBytes = null;
108+
do
109+
{
110+
chunkedBytes = br.ReadBytes(SocketsConstants.ChunkSize);
111+
var endOfMessage = false;
112+
113+
if (chunkedBytes.Length < SocketsConstants.ChunkSize)
114+
endOfMessage = true;
115+
116+
foreach (var connection in Connections)
117+
{
118+
await SendBinaryAsync(transport: connection.Value, chunkedBytes: chunkedBytes, endOfMessage: endOfMessage);
119+
}
120+
121+
if (endOfMessage)
122+
break;
123+
124+
} while (chunkedBytes.Length <= SocketsConstants.ChunkSize);
125+
}
126+
}
127+
}
128+
69129
public async Task SendAsync(string connectionId, WebSocketMessageContext context)
70130
{
71131
WebSocketTransport transport = null;
@@ -93,11 +153,7 @@ public async Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject
93153
throw new ArgumentOutOfRangeException(nameof(transport));
94154
}
95155

96-
var props = JsonConvert.SerializeObject(properties);
97-
var propsBytes = Encoding.UTF8.GetBytes($"{SocketsConstants.Splitter}{props}");
98-
99-
var bytesCount = bytes.Length;
100-
bytes = bytes.Concat(propsBytes).ToArray();
156+
PrepareBytes(ref bytes, properties);
101157

102158
var buffer = new byte[SocketsConstants.ChunkSize];
103159
using (var ms = new MemoryStream(bytes))

src/NetCoreStack.WebSockets/Internal/IConnectionManager.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ public interface IConnectionManager
77
{
88
Task BroadcastAsync(WebSocketMessageContext context);
99

10+
Task BroadcastBinaryAsync(byte[] bytes, JsonObject properties);
11+
1012
Task SendAsync(string connectionId, WebSocketMessageContext context);
1113

1214
Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject properties);
1315

1416
Task SendAsync(string connectionId, WebSocketMessageContext context, WebSocket webSocket);
17+
1518
void CloseConnection(string connectionId);
1619
}
1720
}

test/ServerTestApp/Controllers/DiscoveryController.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,34 @@ public IActionResult Get()
3838
[HttpPost(nameof(SendAsync))]
3939
public async Task<IActionResult> SendAsync([FromBody]SimpleModel model)
4040
{
41-
var echo = $"Echo from server '{model.Message}' - {DateTime.Now}";
41+
var echo = $"Echo from server '{model.Key}' - {DateTime.Now}";
4242
var obj = new { message = echo };
4343
var webSocketContext = new WebSocketMessageContext { Command = WebSocketCommands.DataSend, Value = obj };
4444
await _connectionManager.BroadcastAsync(webSocketContext);
4545
return Ok();
4646
}
4747

48+
[HttpPost(nameof(BrodcastBinaryAsync))]
49+
public async Task<IActionResult> BrodcastBinaryAsync([FromBody]SimpleModel model)
50+
{
51+
var bytes = _distrubutedCache.Get(model.Key);
52+
await _connectionManager.BroadcastBinaryAsync(bytes, new SocketObject { Key = model.Key });
53+
return Ok();
54+
}
55+
4856
[HttpPost(nameof(SendBinaryAsync))]
4957
public async Task<IActionResult> SendBinaryAsync([FromBody]SimpleModel model)
5058
{
51-
var bytes = _distrubutedCache.Get(model.Message);
52-
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Message });
59+
var bytes = _distrubutedCache.Get(model.Key);
60+
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Key });
5361
return Ok();
5462
}
5563

5664
[HttpPost(nameof(SendBinaryFromMemoryAsync))]
5765
public async Task<IActionResult> SendBinaryFromMemoryAsync([FromBody]SimpleModel model)
5866
{
59-
var bytes = (byte[])_memoryCache.Get(model.Message);
60-
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Message });
67+
var bytes = (byte[])_memoryCache.Get(model.Key);
68+
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Key });
6169
return Ok();
6270
}
6371
}

test/ServerTestApp/Models/SimpleModel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
public class SimpleModel
44
{
55
public string ConnectionId { get; set; }
6-
public string Message { get; set; }
6+
public string Key { get; set; }
77
}
88
}

test/ServerTestApp/Startup.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void ConfigureServices(IServiceCollection services)
4444
services.AddDistributedRedisCache(options =>
4545
{
4646
options.Configuration = "localhost";
47-
options.InstanceName = "SocketsInstance";
47+
options.InstanceName = "RedisInstance";
4848
});
4949

5050
services.AddTransient<IHandshakeStateTransport, MyHandshakeStateTransport>();

0 commit comments

Comments
 (0)