Skip to content

Commit 661b53b

Browse files
committed
Makes connections be parallel
1 parent b6f8fd3 commit 661b53b

File tree

3 files changed

+99
-38
lines changed

3 files changed

+99
-38
lines changed

src/FRC.NetworkTables/Dispatcher.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using NetworkTables.Interfaces;
55
using NetworkTables.TcpSockets;
66
using NetworkTables.Logging;
7+
using System;
78

89
namespace NetworkTables
910
{
@@ -18,7 +19,7 @@ private Dispatcher() : this(Storage.Instance, Notifier.Instance)
1819
public Dispatcher(Storage storage, Notifier notifier)
1920
: base(storage, notifier)
2021
{
21-
22+
2223
}
2324

2425
/// <summary>
@@ -45,27 +46,31 @@ public void StartServer(string persistentFilename, string listenAddress, int por
4546

4647
public void SetServer(string serverName, int port)
4748
{
48-
SetConnector(() => TcpConnector.Connect(serverName, port, Logger.Instance, 1));
49+
SetConnector(() =>
50+
{
51+
return TcpConnector.Connect(new List<(string server, int port)> { (serverName, port) }, Logger.Instance, TimeSpan.FromSeconds(1));
52+
});
4953
}
5054

5155
public void SetServer(IList<NtIPAddress> servers)
5256
{
53-
List<Connector> connectors = new List<Connector>();
57+
List<(string server, int port)> addresses = new List<(string server, int port)>(servers.Count);
5458
foreach (var server in servers)
5559
{
56-
connectors.Add(() => TcpConnector.Connect(server.IpAddress, server.Port, Logger.Instance, 1));
60+
addresses.Add((server.IpAddress, server.Port));
5761
}
58-
SetConnector(connectors);
62+
63+
SetConnector(() => TcpConnector.Connect(addresses, Logger.Instance, TimeSpan.FromSeconds(1)));
5964
}
6065

6166
public void SetServerOverride(IPAddress address, int port)
6267
{
63-
SetConnectorOverride(() => TcpConnector.Connect(address.ToString(), port, Logger.Instance, 1));
68+
SetConnectorOverride(() => TcpConnector.Connect(new List<(string server, int port)> { (address.ToString(), port) }, Logger.Instance, TimeSpan.FromSeconds(1)));
6469
}
6570

6671
public void ClearServerOverride()
6772
{
6873
ClearConnectorOverride();
6974
}
7075
}
71-
}
76+
}

src/FRC.NetworkTables/DispatcherBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace NetworkTables
1313
{
1414
internal class DispatcherBase : IDisposable
1515
{
16-
public delegate NtTcpClient Connector();
16+
public delegate IClient Connector();
1717

1818
public const double MinimumUpdateTime = 0.01; //100ms
1919
public const double MaximumUpdateTime = 1.0; //1 second

src/FRC.NetworkTables/TcpSockets/TcpConnector.cs

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,59 +6,114 @@
66
using System.Threading.Tasks;
77
using System.Runtime.ExceptionServices;
88
using static NetworkTables.Logging.Logger;
9+
using Nito.AsyncEx;
10+
using System.IO;
911

1012
namespace NetworkTables.TcpSockets
1113
{
1214
internal class TcpConnector
1315
{
14-
private static bool WaitAndUnwrapException(Task task, int timeout)
16+
public class TcpClientNt : IClient
1517
{
16-
try
18+
private readonly TcpClient m_client;
19+
20+
internal TcpClientNt(TcpClient client)
1721
{
18-
return task.Wait(timeout);
22+
m_client = client;
1923
}
20-
catch (AggregateException ex)
24+
25+
public Stream GetStream()
2126
{
22-
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
23-
throw ex.InnerException;
27+
return m_client.GetStream();
28+
}
29+
public EndPoint RemoteEndPoint
30+
{
31+
get
32+
{
33+
return m_client.Client.RemoteEndPoint;
34+
}
35+
}
36+
public bool NoDelay
37+
{
38+
set
39+
{
40+
}
41+
}
42+
43+
public void Dispose()
44+
{
45+
m_client.Dispose();
2446
}
2547
}
2648

27-
private static int ResolveHostName(string hostName, out IPAddress[] addr)
49+
public static IClient Connect(IList<(string server, int port)> servers, Logger logger, TimeSpan timeout)
2850
{
29-
try
51+
if (servers.Count == 0)
3052
{
31-
var entries = Dns.GetHostAddressesAsync(hostName);
32-
var success = WaitAndUnwrapException(entries, 1000);
33-
if (!success)
53+
return null;
54+
}
55+
56+
return new TcpClientNt(AsyncContext.Run(async () => {
57+
TcpClient toReturn = null;
58+
var clientTcp = new List<TcpClient>();
59+
var clientTask = new List<Task>();
60+
try
3461
{
35-
addr = null;
36-
return 1;
62+
for (int i = 0; i < servers.Count; i++)
63+
{
64+
TcpClient client = new TcpClient();
65+
Task connectTask = client.ConnectAsync(servers[i].server, servers[i].port);
66+
clientTcp.Add(client);
67+
clientTask.Add(connectTask);
68+
}
69+
70+
// 10 second timeout
71+
var delayTask = Task.Delay(timeout);
72+
73+
clientTask.Add(delayTask);
74+
75+
while (clientTcp.Count != 0)
76+
{
77+
var finished = await Task.WhenAny(clientTask);
78+
79+
var index = clientTask.IndexOf(finished);
80+
if (finished == delayTask)
81+
{
82+
return null;
83+
}
84+
else if (finished.IsCompleted && !finished.IsFaulted && !finished.IsCanceled)
85+
{
86+
toReturn = clientTcp[index];
87+
return toReturn;
88+
}
89+
var remove = clientTcp[index];
90+
clientTcp.RemoveAt(index);
91+
remove.Dispose();
92+
clientTask.RemoveAt(index);
93+
}
94+
return null;
3795
}
38-
List<IPAddress> addresses = new List<IPAddress>();
39-
foreach (var ipAddress in entries.Result)
96+
finally
4097
{
41-
// Only allow IPV4 addresses for now
42-
// Sockets don't all support IPV6
43-
if (ipAddress.AddressFamily == AddressFamily.InterNetwork)
98+
for (int i = 0; i < clientTcp.Count; i++)
4499
{
45-
if (!addresses.Contains(ipAddress))
100+
if (clientTcp[i] != toReturn)
46101
{
47-
addresses.Add(ipAddress);
102+
try
103+
{
104+
clientTcp[i].Dispose();
105+
}
106+
catch (Exception e)
107+
{
108+
// Ignore exception
109+
}
48110
}
49111
}
50112
}
51-
addr = addresses.ToArray();
52-
53-
}
54-
catch (SocketException e)
55-
{
56-
addr = null;
57-
return (int)e.SocketErrorCode;
58-
}
59-
return 0;
113+
}));
60114
}
61115

116+
/*
62117
public static NtTcpClient Connect(string server, int port, Logger logger, int timeout = 0)
63118
{
64119
if (ResolveHostName(server, out IPAddress[] addr) != 0)
@@ -102,5 +157,6 @@ public static NtTcpClient Connect(string server, int port, Logger logger, int ti
102157
}
103158
return client;
104159
}
160+
*/
105161
}
106-
}
162+
}

0 commit comments

Comments
 (0)