Skip to content

Commit da4b67f

Browse files
committed
RawSocket pub/sub optimization
Serializing the message once for all the subscribers, even for RawSocket transport
1 parent 14f5db1 commit da4b67f

File tree

10 files changed

+86
-16
lines changed

10 files changed

+86
-16
lines changed

src/net45/Default/WampSharp.NewtonsoftJson/Newtonsoft/JTokenMessageParser.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.IO;
3+
using System.Text;
34
using WampSharp.Logging;
45
using Newtonsoft.Json;
56
using Newtonsoft.Json.Linq;
@@ -49,6 +50,11 @@ public string Format(WampMessage<object> message)
4950
return result;
5051
}
5152

53+
public byte[] GetBytes(string raw)
54+
{
55+
return Encoding.UTF8.GetBytes(raw);
56+
}
57+
5258
public WampMessage<JToken> Parse(Stream stream)
5359
{
5460
try

src/net45/Default/WampSharp.NewtonsoftMsgpack/MsgPack/MessagePackParser.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.IO;
3+
using System.Text;
34
using WampSharp.Logging;
45
using Newtonsoft.Json;
56
using Newtonsoft.Json.Linq;
@@ -90,6 +91,11 @@ public byte[] Format(WampMessage<object> message)
9091
}
9192
}
9293

94+
public byte[] GetBytes(byte[] raw)
95+
{
96+
return raw;
97+
}
98+
9399
public WampMessage<JToken> Parse(Stream stream)
94100
{
95101
try

src/net45/Extensions/WampSharp.AspNetCore.RawSocket/AspNetCoreRawSocketTransport.cs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@ public class AspNetCoreRawSocketTransport : TextBinaryTransport<SocketData>
1616
private Func<ConnectionContext, Func<Task>, Task> mHandler;
1717
private readonly Handshaker mHandshaker = new Handshaker();
1818

19-
public byte MaxSize
20-
{
21-
get;
22-
}
19+
public byte MaxSize { get; }
2320

2421
public AspNetCoreRawSocketTransport(IConnectionBuilder app,
2522
byte maxSize = 15,
@@ -61,22 +58,27 @@ protected override string GetSubProtocol(SocketData connection)
6158
return connection.HandshakeResponse.SerializerType.GetSubProtocol();
6259
}
6360

64-
protected override IWampConnection<TMessage> CreateBinaryConnection<TMessage>
65-
(SocketData connection,
66-
IWampBinaryBinding<TMessage> binding)
61+
protected override IWampConnection<TMessage> CreateBinaryConnection<TMessage>(SocketData connection,
62+
IWampBinaryBinding<TMessage>
63+
binding)
6764
{
6865
return CreateConnection(connection, binding);
6966
}
7067

71-
protected override IWampConnection<TMessage> CreateTextConnection<TMessage>
72-
(SocketData connection,
73-
IWampTextBinding<TMessage> binding)
68+
protected override IWampConnection<TMessage> CreateTextConnection<TMessage>(SocketData connection,
69+
IWampTextBinding<TMessage> binding)
7470
{
7571
return CreateConnection(connection, binding);
7672
}
7773

78-
private IWampConnection<TMessage> CreateConnection<TMessage>(SocketData connection, IWampStreamingMessageParser<TMessage> binding)
74+
private IWampConnection<TMessage> CreateConnection<TMessage, TRaw>(SocketData connection,
75+
IWampTransportBinding<TMessage, TRaw> binding)
7976
{
77+
if (binding.ComputeBytes == null)
78+
{
79+
binding.ComputeBytes = true;
80+
}
81+
8082
return new RawSocketConnection<TMessage>(connection, binding, mAutoPingInterval);
8183
}
8284

src/net45/Extensions/WampSharp.RawSocket/RawSocket/RawSocketTransport.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,13 @@ protected override IWampConnection<TMessage> CreateTextConnection<TMessage>
144144
return CreateConnection(connection, binding);
145145
}
146146

147-
private TcpClientConnection<TMessage> CreateConnection<TMessage>(RawSocketTcpClient connection, IWampStreamingMessageParser<TMessage> binding)
147+
private TcpClientConnection<TMessage> CreateConnection<TMessage, TRaw>(RawSocketTcpClient connection, IWampTransportBinding<TMessage, TRaw> binding)
148148
{
149+
if (binding.ComputeBytes == null)
150+
{
151+
binding.ComputeBytes = true;
152+
}
153+
149154
return new TcpClientConnection<TMessage>
150155
(connection.Client,
151156
connection.Stream,

src/net45/WampSharp/WAMP2/V2/Authentication/Host/WampAuthenticationBinaryBinding.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public byte[] Format(WampMessage<object> message)
2828
return mBinding.Format(message);
2929
}
3030

31+
public byte[] GetBytes(byte[] raw)
32+
{
33+
return mBinding.GetBytes(raw);
34+
}
35+
3136
public WampMessage<TMessage> Parse(Stream stream)
3237
{
3338
return mBinding.Parse(stream);
@@ -37,5 +42,11 @@ public void Format(WampMessage<object> message, Stream stream)
3742
{
3843
mBinding.Format(message, stream);
3944
}
45+
46+
public bool? ComputeBytes
47+
{
48+
get => mBinding.ComputeBytes;
49+
set => mBinding.ComputeBytes = value;
50+
}
4051
}
4152
}

src/net45/WampSharp/WAMP2/V2/Authentication/Host/WampAuthenticationTextBinding.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ public string Format(WampMessage<object> message)
2828
return mBinding.Format(message);
2929
}
3030

31+
public byte[] GetBytes(string raw)
32+
{
33+
return mBinding.GetBytes(raw);
34+
}
35+
3136
public WampMessage<TMessage> Parse(Stream stream)
3237
{
3338
return mBinding.Parse(stream);
@@ -37,5 +42,11 @@ public void Format(WampMessage<object> message, Stream stream)
3742
{
3843
mBinding.Format(message, stream);
3944
}
45+
46+
public bool? ComputeBytes
47+
{
48+
get => mBinding.ComputeBytes;
49+
set => mBinding.ComputeBytes = value;
50+
}
4051
}
4152
}

src/net45/WampSharp/WAMP2/V2/Binding/IWampTransportBinding.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ public interface IWampTransportBinding<TMessage, TRaw> :
1111
IWampBinding<TMessage>,
1212
IWampMessageParser<TMessage, TRaw>
1313
{
14+
bool? ComputeBytes { get; set; }
1415
}
1516
}

src/net45/WampSharp/WAMP2/V2/Binding/Messages/RawMessage.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ public RawMessage(WampMessage<object> other) : base(other)
99
}
1010

1111
public TRaw Raw { get; set; }
12+
13+
public byte[] Bytes { get; set; }
1214
}
1315
}

src/net45/WampSharp/WAMP2/V2/Binding/Parsers/IWampMessageParser.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,12 @@ public interface IWampMessageParser<TMessage, TRaw> : IWampStreamingMessageParse
2121
/// <param name="message">The given <see cref="WampMessage{TMessage}"/>.</param>
2222
/// <returns>A raw format representing the given <see cref="WampMessage{TMessage}"/>.</returns>
2323
TRaw Format(WampMessage<object> message);
24+
25+
/// <summary>
26+
/// Serializes a raw message to bytes.
27+
/// </summary>
28+
/// <param name="raw">The given raw message.</param>
29+
/// <returns>Bytes representing the binary format of the given raw message.</returns>
30+
byte[] GetBytes(TRaw raw);
2431
}
2532
}

src/net45/WampSharp/WAMP2/V2/Binding/WampTransportBinding.cs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,28 @@ public TRaw Format(WampMessage<object> message)
3333
return textMessage.Raw;
3434
}
3535

36+
public byte[] GetBytes(TRaw raw)
37+
{
38+
return mParser.GetBytes(raw);
39+
}
40+
3641
public override WampMessage<object> GetRawMessage(WampMessage<object> message)
3742
{
3843
return GetFormattedMessage(message);
3944
}
4045

4146
private RawMessage<TRaw> GetFormattedMessage(WampMessage<object> message)
4247
{
43-
RawMessage<TRaw> result = message as RawMessage<TRaw>;
4448

45-
if (result == null)
49+
if (!(message is RawMessage<TRaw> result))
4650
{
4751
result = new RawMessage<TRaw>(message);
4852
result.Raw = mParser.Format(message);
53+
54+
if (ComputeBytes == true)
55+
{
56+
result.Bytes = mParser.GetBytes(result.Raw);
57+
}
4958
}
5059

5160
return result;
@@ -58,8 +67,18 @@ public WampMessage<TMessage> Parse(Stream stream)
5867

5968
public void Format(WampMessage<object> message, Stream stream)
6069
{
61-
// TODO: You know! reuse the RawMessage if possible :)
62-
mParser.Format(message, stream);
70+
if (ComputeBytes == true &&
71+
message is RawMessage<TRaw> casted)
72+
{
73+
byte[] bytes = casted.Bytes;
74+
stream.Write(bytes, 0, bytes.Length);
75+
}
76+
else
77+
{
78+
mParser.Format(message, stream);
79+
}
6380
}
81+
82+
public bool? ComputeBytes { get; set; }
6483
}
6584
}

0 commit comments

Comments
 (0)