From ec9f256070e744048cd8e33bc5b981130bb109ae Mon Sep 17 00:00:00 2001 From: Thad House Date: Fri, 26 May 2017 12:41:20 -0700 Subject: [PATCH 1/3] Makes connections be parallel --- src/FRC.NetworkTables/Dispatcher.cs | 19 +-- src/FRC.NetworkTables/DispatcherBase.cs | 2 +- .../TcpSockets/TcpConnector.cs | 117 +++++++++++++----- 3 files changed, 101 insertions(+), 37 deletions(-) diff --git a/src/FRC.NetworkTables/Dispatcher.cs b/src/FRC.NetworkTables/Dispatcher.cs index 1600dd3..95366ea 100644 --- a/src/FRC.NetworkTables/Dispatcher.cs +++ b/src/FRC.NetworkTables/Dispatcher.cs @@ -4,6 +4,7 @@ using NetworkTables.Interfaces; using NetworkTables.TcpSockets; using NetworkTables.Logging; +using System; namespace NetworkTables { @@ -18,7 +19,7 @@ private Dispatcher() : this(Storage.Instance, Notifier.Instance) public Dispatcher(Storage storage, Notifier notifier) : base(storage, notifier) { - + } /// @@ -45,22 +46,26 @@ public void StartServer(string persistentFilename, string listenAddress, int por public void SetServer(string serverName, int port) { - SetConnector(() => TcpConnector.Connect(serverName, port, Logger.Instance, 1)); + SetConnector(() => + { + return TcpConnector.Connect(new List<(string server, int port)> { (serverName, port) }, Logger.Instance, TimeSpan.FromSeconds(1)); + }); } public void SetServer(IList servers) { - List connectors = new List(); + List<(string server, int port)> addresses = new List<(string server, int port)>(servers.Count); foreach (var server in servers) { - connectors.Add(() => TcpConnector.Connect(server.IpAddress, server.Port, Logger.Instance, 1)); + addresses.Add((server.IpAddress, server.Port)); } - SetConnector(connectors); + + SetConnector(() => TcpConnector.Connect(addresses, Logger.Instance, TimeSpan.FromSeconds(1))); } public void SetServerOverride(IPAddress address, int port) { - SetConnectorOverride(() => TcpConnector.Connect(address.ToString(), port, Logger.Instance, 1)); + SetConnectorOverride(() => TcpConnector.Connect(new List<(string server, int port)> { (address.ToString(), port) }, Logger.Instance, TimeSpan.FromSeconds(1))); } public void ClearServerOverride() @@ -68,4 +73,4 @@ public void ClearServerOverride() ClearConnectorOverride(); } } -} +} \ No newline at end of file diff --git a/src/FRC.NetworkTables/DispatcherBase.cs b/src/FRC.NetworkTables/DispatcherBase.cs index a5d4512..e41ed32 100644 --- a/src/FRC.NetworkTables/DispatcherBase.cs +++ b/src/FRC.NetworkTables/DispatcherBase.cs @@ -13,7 +13,7 @@ namespace NetworkTables { internal class DispatcherBase : IDisposable { - public delegate NtTcpClient Connector(); + public delegate IClient Connector(); public const double MinimumUpdateTime = 0.01; //100ms public const double MaximumUpdateTime = 1.0; //1 second diff --git a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs index 4d6f8e6..274aff5 100644 --- a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs +++ b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs @@ -6,59 +6,117 @@ using System.Threading.Tasks; using System.Runtime.ExceptionServices; using static NetworkTables.Logging.Logger; +using Nito.AsyncEx; +using System.IO; namespace NetworkTables.TcpSockets { internal class TcpConnector { - private static bool WaitAndUnwrapException(Task task, int timeout) + public class TcpClientNt : IClient { - try + private readonly TcpClient m_client; + + internal TcpClientNt(TcpClient client) { - return task.Wait(timeout); + m_client = client; } - catch (AggregateException ex) + + public Stream GetStream() { - ExceptionDispatchInfo.Capture(ex.InnerException).Throw(); - throw ex.InnerException; + return m_client.GetStream(); + } + public EndPoint RemoteEndPoint + { + get + { + return m_client.Client.RemoteEndPoint; + } + } + public bool NoDelay + { + set + { + } + } + + public void Dispose() + { + m_client.Dispose(); } } - private static int ResolveHostName(string hostName, out IPAddress[] addr) + public static IClient Connect(IList<(string server, int port)> servers, Logger logger, TimeSpan timeout) { - try + if (servers.Count == 0) { - var entries = Dns.GetHostAddressesAsync(hostName); - var success = WaitAndUnwrapException(entries, 1000); - if (!success) + return null; + } + + TcpClient c = AsyncContext.Run(async () => { + TcpClient toReturn = null; + var clientTcp = new List(); + var clientTask = new List(); + try { - addr = null; - return 1; + for (int i = 0; i < servers.Count; i++) + { + TcpClient client = new TcpClient(); + Task connectTask = client.ConnectAsync(servers[i].server, servers[i].port); + clientTcp.Add(client); + clientTask.Add(connectTask); + } + + // 10 second timeout + var delayTask = Task.Delay(timeout); + + clientTask.Add(delayTask); + + while (clientTcp.Count != 0) + { + var finished = await Task.WhenAny(clientTask); + + var index = clientTask.IndexOf(finished); + if (finished == delayTask) + { + return null; + } + else if (finished.IsCompleted && !finished.IsFaulted && !finished.IsCanceled) + { + toReturn = clientTcp[index]; + return toReturn; + } + var remove = clientTcp[index]; + clientTcp.RemoveAt(index); + remove.Dispose(); + clientTask.RemoveAt(index); + } + return null; } - List addresses = new List(); - foreach (var ipAddress in entries.Result) + finally { - // Only allow IPV4 addresses for now - // Sockets don't all support IPV6 - if (ipAddress.AddressFamily == AddressFamily.InterNetwork) + for (int i = 0; i < clientTcp.Count; i++) { - if (!addresses.Contains(ipAddress)) + if (clientTcp[i] != toReturn) { - addresses.Add(ipAddress); + try + { + clientTcp[i].Dispose(); + } + catch (Exception e) + { + // Ignore exception + } } } } - addr = addresses.ToArray(); + }); - } - catch (SocketException e) - { - addr = null; - return (int)e.SocketErrorCode; - } - return 0; + if (c == null) return null; + return new TcpClientNt(c); } + /* public static NtTcpClient Connect(string server, int port, Logger logger, int timeout = 0) { if (ResolveHostName(server, out IPAddress[] addr) != 0) @@ -102,5 +160,6 @@ public static NtTcpClient Connect(string server, int port, Logger logger, int ti } return client; } + */ } -} +} \ No newline at end of file From 621189f445035aa6a353c96bd36245e3e88441c4 Mon Sep 17 00:00:00 2001 From: Thad House Date: Fri, 26 May 2017 13:13:22 -0700 Subject: [PATCH 2/3] Prints some errors when all IP's fail to connect --- .../TcpSockets/TcpConnector.cs | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs index 274aff5..ea32574 100644 --- a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs +++ b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs @@ -46,6 +46,15 @@ public void Dispose() } } + private static void PrintConnectFailList(IList<(string server, int port)> servers, Logger logger) + { + Logger.Error(logger, "Failed to connect to the following IP Addresses:"); + foreach (var item in servers) + { + Logger.Error(logger, $" Server: {item.server} Port: {item.port}"); + } + } + public static IClient Connect(IList<(string server, int port)> servers, Logger logger, TimeSpan timeout) { if (servers.Count == 0) @@ -55,7 +64,7 @@ public static IClient Connect(IList<(string server, int port)> servers, Logger l TcpClient c = AsyncContext.Run(async () => { TcpClient toReturn = null; - var clientTcp = new List(); + var clientTcp = new List<(TcpClient tcpCient, (string server, int port) remote)>(); var clientTask = new List(); try { @@ -63,7 +72,7 @@ public static IClient Connect(IList<(string server, int port)> servers, Logger l { TcpClient client = new TcpClient(); Task connectTask = client.ConnectAsync(servers[i].server, servers[i].port); - clientTcp.Add(client); + clientTcp.Add((client, servers[i])); clientTask.Add(connectTask); } @@ -79,31 +88,33 @@ public static IClient Connect(IList<(string server, int port)> servers, Logger l var index = clientTask.IndexOf(finished); if (finished == delayTask) { + PrintConnectFailList(servers, logger); return null; } else if (finished.IsCompleted && !finished.IsFaulted && !finished.IsCanceled) { - toReturn = clientTcp[index]; + toReturn = clientTcp[index].tcpCient; return toReturn; } var remove = clientTcp[index]; clientTcp.RemoveAt(index); - remove.Dispose(); + remove.tcpCient.Dispose(); clientTask.RemoveAt(index); } + PrintConnectFailList(servers, logger); return null; } finally { for (int i = 0; i < clientTcp.Count; i++) { - if (clientTcp[i] != toReturn) + if (clientTcp[i].tcpCient != toReturn) { try { - clientTcp[i].Dispose(); + clientTcp[i].tcpCient.Dispose(); } - catch (Exception e) + catch (Exception) { // Ignore exception } From d62a22a0bebe2ca32ccba462a621f5af2ad93614 Mon Sep 17 00:00:00 2001 From: Thad House Date: Tue, 24 Oct 2017 15:54:38 -0700 Subject: [PATCH 3/3] Switch to better ParallelConnection algorithm Tested on android to work better now. --- src/FRC.NetworkTables/Dispatcher.cs | 6 +- .../TcpSockets/TcpConnector.cs | 143 ++++++------------ 2 files changed, 47 insertions(+), 102 deletions(-) diff --git a/src/FRC.NetworkTables/Dispatcher.cs b/src/FRC.NetworkTables/Dispatcher.cs index 95366ea..1a92b77 100644 --- a/src/FRC.NetworkTables/Dispatcher.cs +++ b/src/FRC.NetworkTables/Dispatcher.cs @@ -48,7 +48,7 @@ public void SetServer(string serverName, int port) { SetConnector(() => { - return TcpConnector.Connect(new List<(string server, int port)> { (serverName, port) }, Logger.Instance, TimeSpan.FromSeconds(1)); + return TcpConnector.ConnectParallel(new List<(string server, int port)> { (serverName, port) }, Logger.Instance, TimeSpan.FromSeconds(3)); }); } @@ -60,12 +60,12 @@ public void SetServer(IList servers) addresses.Add((server.IpAddress, server.Port)); } - SetConnector(() => TcpConnector.Connect(addresses, Logger.Instance, TimeSpan.FromSeconds(1))); + SetConnector(() => TcpConnector.ConnectParallel(addresses, Logger.Instance, TimeSpan.FromSeconds(3))); } public void SetServerOverride(IPAddress address, int port) { - SetConnectorOverride(() => TcpConnector.Connect(new List<(string server, int port)> { (address.ToString(), port) }, Logger.Instance, TimeSpan.FromSeconds(1))); + SetConnectorOverride(() => TcpConnector.ConnectParallel(new List<(string server, int port)> { (address.ToString(), port) }, Logger.Instance, TimeSpan.FromSeconds(3))); } public void ClearServerOverride() diff --git a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs index ea32574..fb9b6f0 100644 --- a/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs +++ b/src/FRC.NetworkTables/TcpSockets/TcpConnector.cs @@ -55,122 +55,67 @@ private static void PrintConnectFailList(IList<(string server, int port)> server } } - public static IClient Connect(IList<(string server, int port)> servers, Logger logger, TimeSpan timeout) + public static IClient ConnectParallel(IList<(string ip, int port)> conns, Logger logger, TimeSpan timeout) { - if (servers.Count == 0) - { - return null; - } - - TcpClient c = AsyncContext.Run(async () => { - TcpClient toReturn = null; - var clientTcp = new List<(TcpClient tcpCient, (string server, int port) remote)>(); - var clientTask = new List(); - try - { - for (int i = 0; i < servers.Count; i++) - { - TcpClient client = new TcpClient(); - Task connectTask = client.ConnectAsync(servers[i].server, servers[i].port); - clientTcp.Add((client, servers[i])); - clientTask.Add(connectTask); - } - - // 10 second timeout - var delayTask = Task.Delay(timeout); - - clientTask.Add(delayTask); - - while (clientTcp.Count != 0) - { - var finished = await Task.WhenAny(clientTask); - - var index = clientTask.IndexOf(finished); - if (finished == delayTask) - { - PrintConnectFailList(servers, logger); - return null; - } - else if (finished.IsCompleted && !finished.IsFaulted && !finished.IsCanceled) - { - toReturn = clientTcp[index].tcpCient; - return toReturn; - } - var remove = clientTcp[index]; - clientTcp.RemoveAt(index); - remove.tcpCient.Dispose(); - clientTask.RemoveAt(index); - } - PrintConnectFailList(servers, logger); - return null; - } - finally - { - for (int i = 0; i < clientTcp.Count; i++) - { - if (clientTcp[i].tcpCient != toReturn) - { - try - { - clientTcp[i].tcpCient.Dispose(); - } - catch (Exception) - { - // Ignore exception - } - } - } - } - }); - - if (c == null) return null; - return new TcpClientNt(c); + return ConnectParallelAsync(conns, logger, timeout).Result; } - /* - public static NtTcpClient Connect(string server, int port, Logger logger, int timeout = 0) + public static Task ConnectParallelAsync(IList<(string ip, int port)> conns, Logger logger, TimeSpan timeout) { - if (ResolveHostName(server, out IPAddress[] addr) != 0) + List clients = new List(); + List tasks = new List(); + + foreach (var item in conns) { + var client = new TcpClient(); + Task connectTask; try { - addr = new IPAddress[1]; - addr[0] = IPAddress.Parse(server); - } - catch (FormatException) - { - Error(logger, $"could not resolve {server} address"); - return null; - } - } + connectTask = client.ConnectAsync(item.ip, item.port); - //Create out client - NtTcpClient client = new NtTcpClient(AddressFamily.InterNetwork); - // No time limit, connect forever - if (timeout == 0) - { - try + } + catch (ArgumentOutOfRangeException aore) { - client.Connect(addr, port); + // TODO: Log + Logger.Error(logger, $"Bad argument {aore}"); + continue; } - catch (SocketException ex) + catch (SocketException se) { - Error(logger, $"Connect() to {server} port {port.ToString()} failed: {ex.SocketErrorCode}"); - ((IDisposable)client).Dispose(); - return null; + // TODO: Log + Logger.Warning(logger, $"Socket connect failed {se}"); + continue; } - return client; + clients.Add(client); + tasks.Add(connectTask); } - //Connect with time limit - bool connectedWithTimeout = client.ConnectWithTimeout(addr, port, logger, timeout); - if (!connectedWithTimeout) + var delayTask = Task.Delay(timeout); + tasks.Add(delayTask); + + async Task ConnectAsyncInternal() { - ((IDisposable)client).Dispose(); + while (tasks.Count > 0) + { + var task = await Task.WhenAny(tasks); + if (task == delayTask) + { + return null; + } + var index = tasks.IndexOf(task); + var client = clients[index]; + if (client.Connected) + { + return new TcpClientNt(client); + } + clients.RemoveAt(index); + tasks.RemoveAt(index); + } return null; } - return client; + + return ConnectAsyncInternal(); } - */ + } } \ No newline at end of file