Skip to content

Commit 5e2ac9d

Browse files
authored
Tcp bus (#110)
* TCP bus improvements - Server-side bus routes messages to the correct client instead of broadcasting to all clients. - Allow for custom message serializers * Add some missing tests
1 parent 1274150 commit 5e2ac9d

15 files changed

+773
-430
lines changed
Lines changed: 190 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,208 @@
11
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
25
using System.Net;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Newtonsoft.Json;
9+
using ReactiveDomain.Messaging;
310
using ReactiveDomain.Messaging.Bus;
11+
using ReactiveDomain.Transport.Serialization;
12+
using Xunit;
413

514
namespace ReactiveDomain.Transport.Tests
615
{
7-
public class TcpBusClientSideTests
16+
[Collection("TCP bus tests")]
17+
public class TcpBusClientSideTests : IDisposable
818
{
9-
private IPAddress _hostAddress;
10-
private IDispatcher _commandBus;
11-
private IPAddress _clientAddress;
12-
private MockTcpConnection _clientTcpConnection;
13-
private const int CommandPort = 10660;
19+
private const string ShortProp = "abc";
20+
// 16kb is large enough to cause the transport to split up the frame.
21+
// It would be better if we did the splitting manually so we were sure it really happened.
22+
// Would require mocking more things.
23+
private readonly string _longProp = string.Join("", Enumerable.Repeat("a", 16 * 1024));
24+
25+
private readonly Dispatcher _localBus = new Dispatcher("local");
26+
private readonly IList<IDisposable> _subscriptions = new List<IDisposable>();
27+
private readonly TcpBusServerSide _tcpBusServerSide;
28+
private readonly TcpBusClientSide _tcpBusClientSide;
29+
private readonly TaskCompletionSource<IMessage> _tcs;
1430

1531
public TcpBusClientSideTests()
1632
{
17-
_commandBus = new Dispatcher("TestBus");
18-
_hostAddress = IPAddress.Loopback;
19-
_clientAddress = IPAddress.Loopback;
20-
_clientTcpConnection = MockTcpConnection.CreateConnectingTcpConnection(Guid.NewGuid(),
21-
new IPEndPoint(_hostAddress, CommandPort),
22-
new TcpClientConnector(),
23-
TimeSpan.FromSeconds(120),
24-
conn =>
25-
{
26-
},
27-
(conn, err) =>
28-
{
29-
},
30-
verbose: true);
33+
var hostAddress = IPAddress.Loopback;
34+
var port = 10000;
35+
_tcs = new TaskCompletionSource<IMessage>();
36+
37+
// server side
38+
var serverInbound = new QueuedHandler(
39+
new AdHocHandler<IMessage>(m => { if (m is Command cmd) _localBus.TrySend(cmd, out _); }),
40+
"InboundMessageServerHandler",
41+
true,
42+
TimeSpan.FromMilliseconds(1000));
43+
44+
_tcpBusServerSide = new TcpBusServerSide(
45+
hostAddress,
46+
port,
47+
inboundNondiscardingMessageTypes: new[] { typeof(WoftamCommand) },
48+
inboundNondiscardingMessageQueuedHandler: serverInbound);
49+
50+
_localBus.SubscribeToAll(_tcpBusServerSide);
51+
52+
serverInbound.Start();
53+
54+
// client side
55+
var clientInbound = new QueuedHandler(
56+
new AdHocHandler<IMessage>(_tcs.SetResult),
57+
"InboundMessageQueuedHandler",
58+
true,
59+
TimeSpan.FromMilliseconds(1000));
60+
61+
_tcpBusClientSide = new TcpBusClientSide(
62+
hostAddress,
63+
port,
64+
inboundNondiscardingMessageTypes: new[] { typeof(CommandResponse) },
65+
inboundNondiscardingMessageQueuedHandler: clientInbound,
66+
messageSerializers: new Dictionary<Type, IMessageSerializer>
67+
{ { typeof(WoftamCommandResponse), new WoftamCommandResponse.Serializer() } });
68+
69+
clientInbound.Start();
70+
71+
// wait for tcp connection to be established (maybe an api to detect this would be nice)
72+
Thread.Sleep(TimeSpan.FromMilliseconds(200));
73+
}
74+
75+
[Fact]
76+
public void can_send_command()
77+
{
78+
var handler = new WoftamCommandHandler(_longProp);
79+
_subscriptions.Add(_localBus.Subscribe(handler));
80+
81+
// First send the command to server so it knows where to send the response.
82+
_tcpBusClientSide.Handle(MessageBuilder.New(() => new WoftamCommand(ShortProp)));
83+
84+
// expect to receive it on the client side
85+
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
86+
Assert.True(gotMessage);
87+
Assert.IsType<Success>(_tcs.Task.Result);
88+
}
89+
90+
[Fact]
91+
public void can_handle_split_frames() // Also tests custom deserializer
92+
{
93+
var handler = new WoftamCommandHandler(_longProp) { ReturnCustomResponse = true };
94+
_subscriptions.Add(_localBus.Subscribe(handler));
95+
96+
// First send the command to server so it knows where to send the response.
97+
// We don't need this properties to be large since we're only testing message
98+
// splitting from server to client.
99+
_tcpBusClientSide.Handle(MessageBuilder.New(() => new WoftamCommand(ShortProp)));
100+
101+
// expect to receive it on the client side
102+
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
103+
Assert.True(gotMessage);
104+
var response = Assert.IsType<WoftamCommandResponse>(_tcs.Task.Result);
105+
Assert.Equal(_longProp, response.PropertyA);
106+
}
31107

108+
public void Dispose()
109+
{
110+
foreach (var subscription in _subscriptions)
111+
{
112+
subscription.Dispose();
113+
}
114+
_localBus?.Dispose();
115+
_tcpBusClientSide.Dispose();
116+
_tcpBusServerSide.Dispose();
117+
}
118+
}
119+
120+
public class WoftamCommand : Command
121+
{
122+
public readonly string Property1;
123+
124+
public WoftamCommand(string property1)
125+
{
126+
Property1 = property1;
127+
}
128+
}
129+
130+
public class WoftamCommandResponse : Success
131+
{
132+
public readonly string PropertyA;
133+
134+
public WoftamCommandResponse(WoftamCommand source, string propertyA)
135+
: base(source)
136+
{
137+
PropertyA = propertyA;
32138
}
33-
~TcpBusClientSideTests()
139+
140+
public class Serializer : IMessageSerializer
34141
{
35-
_clientTcpConnection.Close("I'm done.");
142+
public IMessage DeserializeMessage(string json, Type messageType)
143+
{
144+
var reader = new JsonTextReader(new StringReader(json));
145+
var propA = "";
146+
var correlationId = Guid.Empty;
147+
var causationId = Guid.Empty;
148+
WoftamCommand sourceCommand = null;
149+
while (reader.Read())
150+
{
151+
if (reader.TokenType == JsonToken.PropertyName)
152+
{
153+
if (reader.Value.ToString() == nameof(PropertyA))
154+
{
155+
reader.Read();
156+
propA = reader.Value.ToString();
157+
}
158+
else if (reader.Value.ToString() == "CorrelationId")
159+
{
160+
reader.Read();
161+
correlationId = Guid.Parse(reader.Value.ToString());
162+
}
163+
else if (reader.Value.ToString() == "CausationId")
164+
{
165+
reader.Read();
166+
causationId = Guid.Parse(reader.Value.ToString());
167+
}
168+
else if (reader.Value.ToString() == "SourceCommand")
169+
{
170+
reader.Read();
171+
var serializer = new JsonSerializer();
172+
sourceCommand = serializer.Deserialize<WoftamCommand>(reader);
173+
break;
174+
}
175+
}
176+
}
177+
if (sourceCommand is null)
178+
throw new JsonSerializationException("Could not deserialize WoftamCommandResponse.");
179+
var response = new WoftamCommandResponse(sourceCommand, propA);
180+
if (correlationId != Guid.Empty)
181+
response.CorrelationId = correlationId;
182+
if (causationId != Guid.Empty)
183+
response.CausationId = causationId;
184+
return response;
185+
}
186+
187+
public string SerializeMessage(IMessage message)
188+
{
189+
return JsonConvert.SerializeObject(message, Json.JsonSettings);
190+
}
36191
}
192+
}
37193

38-
// Sigh... at this point, there are no commands defined in ReactiveDomain, so I have nothing with
39-
// which to test.
40-
41-
//[Fact]
42-
//public void handle_command_test()
43-
//{
44-
// // Set up the TcpBusClientSide that I will test, and also hook up the LengthPrefixMessageFramer
45-
// var tcpBusClientSide = new TcpBusClientSide(_hostAddress, _commandBus, _clientAddress, 10000, _clientTcpConnection);
46-
// tcpBusClientSide._framer.RegisterMessageArrivedCallback(tcpBusClientSide.TcpMessageArrived);
47-
// Action<ITcpConnection, IEnumerable<ArraySegment<byte>>> callback = null;
48-
// callback = (x, data) =>
49-
// {
50-
// tcpBusClientSide._framer.UnFrameData(data);
51-
// _clientTcpConnection.ReceiveAsync(callback);
52-
// };
53-
// _clientTcpConnection.ReceiveAsync(callback);
54-
55-
// _clientTcpConnection.SentData = null;
56-
// var cmd = new ImageProcess.Decolorize(true, Guid.NewGuid(), null);
57-
58-
// var cmdResponse = tcpBusClientSide.Handle(cmd);
59-
60-
// var expectedSentData = tcpBusClientSide._framer.FrameData((new TcpMessage(cmd).AsArraySegment()));
61-
// var expectedCmdResponse = cmd.Succeed();
62-
// Assert.NotNull(_clientTcpConnection.SentData);
63-
// Assert.Equal(expectedSentData.ToArray(), _clientTcpConnection.SentData.ToArray());
64-
// Assert.NotNull(cmdResponse);
65-
// Assert.Equal(expectedCmdResponse.Succeeded, cmdResponse.Succeeded);
66-
// Assert.Equal(expectedCmdResponse.Error, cmdResponse.Error);
67-
//}
194+
public class WoftamCommandHandler : IHandleCommand<WoftamCommand>
195+
{
196+
private readonly string _prop;
197+
public bool ReturnCustomResponse { get; set; }
68198

199+
public WoftamCommandHandler(string prop)
200+
{
201+
_prop = prop;
202+
}
203+
public CommandResponse Handle(WoftamCommand command)
204+
{
205+
return ReturnCustomResponse ? new WoftamCommandResponse(command, _prop) : command.Succeed();
206+
}
69207
}
70208
}
Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Collections.Generic;
32
using System.Linq;
43
using System.Net;
54
using System.Threading;
@@ -10,37 +9,39 @@
109

1110
namespace ReactiveDomain.Transport.Tests
1211
{
12+
[Collection("TCP bus tests")]
1313
public class TcpBusServerSideTests
1414
{
15+
private readonly IPAddress _hostAddress = IPAddress.Loopback;
16+
private int port = 10000;
17+
private readonly TaskCompletionSource<IMessage> _tcs = new TaskCompletionSource<IMessage>();
18+
1519
[Fact]
1620
public void can_handle_split_frames()
1721
{
1822
// 16kb large enough to cause the transport to split up the frame.
1923
// it would be better if we did the splitting manually so we were sure it really happened.
2024
// would require mocking more things.
21-
var hostAddress = IPAddress.Loopback;
2225
var prop1 = "prop1";
2326
var prop2 = string.Join("", Enumerable.Repeat("a", 16 * 1024));
24-
var port = 10000;
25-
var tcs = new TaskCompletionSource<IMessage>();
2627

2728
// server side
2829
var serverInbound = new QueuedHandler(
29-
new AdHocHandler<IMessage>(tcs.SetResult),
30+
new AdHocHandler<IMessage>(_tcs.SetResult),
3031
"InboundMessageQueuedHandler",
3132
true,
3233
TimeSpan.FromMilliseconds(1000));
3334

34-
var tcpBusServerSide = new TcpBusServerSide(hostAddress, port, null)
35-
{
36-
InboundMessageQueuedHandler = serverInbound,
37-
InboundSpamMessageTypes = new List<Type>(),
38-
};
35+
var tcpBusServerSide = new TcpBusServerSide(
36+
_hostAddress,
37+
port,
38+
inboundNondiscardingMessageTypes: new[] { typeof(WoftamEvent) },
39+
inboundNondiscardingMessageQueuedHandler: serverInbound);
3940

4041
serverInbound.Start();
4142

4243
// client side
43-
var tcpBusClientSide = new TcpBusClientSide(null, hostAddress, port);
44+
var tcpBusClientSide = new TcpBusClientSide(_hostAddress, port);
4445

4546
// wait for tcp connection to be established (maybe an api to detect this would be nice)
4647
Thread.Sleep(TimeSpan.FromMilliseconds(200));
@@ -49,11 +50,49 @@ public void can_handle_split_frames()
4950
tcpBusClientSide.Handle(new WoftamEvent(prop1, prop2));
5051

5152
// expect to receive it in the server
52-
var gotMessage = tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
53+
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
5354
Assert.True(gotMessage);
54-
var evt = Assert.IsType<WoftamEvent>(tcs.Task.Result);
55+
var evt = Assert.IsType<WoftamEvent>(_tcs.Task.Result);
5556
Assert.Equal(prop1, evt.Property1);
5657
Assert.Equal(prop2, evt.Property2);
58+
59+
tcpBusClientSide.Dispose();
60+
tcpBusServerSide.Dispose();
61+
}
62+
63+
[Fact]
64+
public void can_filter_out_message_types()
65+
{
66+
// server side
67+
var serverInbound = new QueuedHandler(
68+
new AdHocHandler<IMessage>(_tcs.SetResult),
69+
"InboundMessageQueuedHandler",
70+
true,
71+
TimeSpan.FromMilliseconds(1000));
72+
73+
var tcpBusServerSide = new TcpBusServerSide(
74+
_hostAddress,
75+
port,
76+
inboundNondiscardingMessageTypes: new[] { typeof(WoftamEvent) },
77+
inboundNondiscardingMessageQueuedHandler: serverInbound);
78+
79+
serverInbound.Start();
80+
81+
// client side
82+
var tcpBusClientSide = new TcpBusClientSide(_hostAddress, port);
83+
84+
// wait for tcp connection to be established (maybe an api to detect this would be nice)
85+
Thread.Sleep(TimeSpan.FromMilliseconds(200));
86+
87+
// put disallowed message into client
88+
tcpBusClientSide.Handle(new WoftamCommand("abc"));
89+
90+
// expect to receive it in the server but drop it on the floor
91+
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
92+
Assert.False(gotMessage);
93+
94+
tcpBusClientSide.Dispose();
95+
tcpBusServerSide.Dispose();
5796
}
5897
}
5998
}

0 commit comments

Comments
 (0)