diff --git a/CatCore/CatCoreInstance.cs b/CatCore/CatCoreInstance.cs index ebdb677..32b07e9 100644 --- a/CatCore/CatCoreInstance.cs +++ b/CatCore/CatCoreInstance.cs @@ -12,11 +12,14 @@ using CatCore.Services; using CatCore.Services.Interfaces; using CatCore.Services.Multiplexer; +using CatCore.Services.Sockets; +using CatCore.Services.Sockets.Packets; using CatCore.Services.Twitch; using CatCore.Services.Twitch.Interfaces; using CatCore.Services.Twitch.Media; using DryIoc; using JetBrains.Annotations; +using ImTools; using Serilog; using Serilog.Events; using Serilog.Formatting.Display; @@ -153,6 +156,32 @@ private void CreateContainer() _container.Register(Reuse.Singleton); _container.Register(Reuse.Singleton); + // Register socket services + _container.Register(Reuse.Singleton); + _container.RegisterInitializer((service, context) => service.Initialize()); + + _ = Task.Run(() => + { + var socket = _container.Resolve(); + socket.Initialize(); + socket.OnConnect += clientSocket => + { + Log.Logger.Information($"Client connected from {clientSocket.WorkSocket.RemoteEndPoint} and {clientSocket.Uuid}"); + }; + + socket.OnReceive += (clientSocket, data, str) => + { + Log.Logger.Information($"Received from {clientSocket.Uuid}: {str}"); + + clientSocket.QueueSend(new RespondHello(str)); + }; + + socket.OnDisconnect += clientSocket => + { + Log.Logger.Information($"Client disconnected from {clientSocket.WorkSocket.RemoteEndPoint} and {clientSocket.Uuid}"); + }; + }); + // Spin up internal web api service _ = Task.Run(() => { diff --git a/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs new file mode 100644 index 0000000..6af6983 --- /dev/null +++ b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs @@ -0,0 +1,19 @@ +using System; +using CatCore.Services.Sockets; +using CatCore.Services.Sockets.Packets; + +namespace CatCore.Services.Interfaces +{ + internal interface IKittenRawSocketProvider : INeedInitialization, IDisposable + { + bool isServerRunning(); + +#pragma warning disable 649 + event Action? OnConnect; + event Action? OnReceive; + event Action? OnDisconnect; +#pragma warning restore 649 + + + } +} \ No newline at end of file diff --git a/CatCore/Services/KittenRawSocketProvider.cs b/CatCore/Services/KittenRawSocketProvider.cs new file mode 100644 index 0000000..63f7302 --- /dev/null +++ b/CatCore/Services/KittenRawSocketProvider.cs @@ -0,0 +1,188 @@ +using System; +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using CatCore.Services.Interfaces; +using CatCore.Services.Sockets; +using CatCore.Services.Sockets.Packets; +using Serilog; + +namespace CatCore.Services +{ + // Just copied sample code for socket server + internal class KittenRawSocketProvider : IKittenRawSocketProvider + { + private const int SOCKET_PORT = 8338; + + // Not sure if concurrent is needed here + private readonly ConcurrentDictionary _connectedClients = new ConcurrentDictionary(); + + private static SemaphoreSlim NewSemaphore() + { + return new SemaphoreSlim(0, 1); + } + + // Thread signal. + private SemaphoreSlim _allDone = NewSemaphore(); + private readonly ILogger _logger; + + private bool _isServerRunning; + + internal CancellationTokenSource? ServerCts { get; private set; } + +#pragma warning disable 649 + public event Action? OnConnect; + public event Action? OnReceive; + public event Action? OnDisconnect; +#pragma warning restore 649 + + public KittenRawSocketProvider(ILogger logger) + { + _logger = logger; + } + + private bool ValidateServerNotRunning() + { + return !_isServerRunning && !ServerCts?.IsCancellationRequested == null; + } + + private async void StartListening(CancellationTokenSource cts) + { + ServerCts = cts; + + // Establish the local endpoint for the socket. + IPAddress ipAddress = IPAddress.Any; + IPEndPoint localEndPoint = new IPEndPoint(ipAddress, SOCKET_PORT); + + // Create a TCP/IP socket. + Socket listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + + // Bind the socket to the local endpoint and listen for incoming connections. + try + { + _logger.Information($"Binding to port {localEndPoint.Address.MapToIPv4()}:{localEndPoint.Port}"); + listener.Bind(localEndPoint); + listener.Listen(2); //back log is amount of clients allowed to wait + + // Set the event to nonsignaled state. + _allDone = NewSemaphore(); + + _isServerRunning = true; + while (!ServerCts.IsCancellationRequested) + { + // Start an asynchronous socket to listen for connections. + _logger.Information("Waiting for a connection..."); + listener.BeginAccept( + AcceptCallback, + listener); + + + // Wait until a connection is made before continuing. + // this avoids eating CPU cycles + await _allDone.WaitAsync(ServerCts.Token).ConfigureAwait(false); + } + + } + catch (Exception e) + { + _logger.Error(e, "Failed to start Kitty socket listening server"); + } + + _isServerRunning = false; + } + + private void AcceptCallback(IAsyncResult ar) + { + // Signal the main thread to continue. + _allDone.Release(); + + if (ServerCts is null || ServerCts.IsCancellationRequested) + { + return; + } + + // Get the socket that handles the client request. + Socket listener = (Socket) ar.AsyncState; + Socket handler = listener.EndAccept(ar); + + var guid = Guid.NewGuid(); + + // Never have a duplicate + while (_connectedClients.ContainsKey(guid)) + { + guid = Guid.NewGuid(); + } + + ClientSocket clientSocket = new ClientSocket(handler, guid, ServerCts!, HandleDisconnect, HandleRead); + _connectedClients[guid] = clientSocket; + + OnConnect?.Invoke(clientSocket); + } + + private void HandleRead(ClientSocket clientSocket, string receivedData) + { + var p = Packet.TryGetPacketFromJson(receivedData, out _); + + if (p is null) + { + Log.Logger.Warning($"Unable to parse packet from {clientSocket.Uuid}, type is unknown"); + // return; + } + + OnReceive?.Invoke(clientSocket, p, receivedData); + } + + + private void HandleDisconnect(ClientSocket clientSocket) + { + var socket = clientSocket.WorkSocket; + + if (socket.Connected) + { + socket.Shutdown(SocketShutdown.Both); + socket.Close(); + } + + _connectedClients.TryRemove(clientSocket.Uuid, out _); + + OnDisconnect?.Invoke(clientSocket); + } + + public void Initialize() + { + if (!ValidateServerNotRunning()) + { + _logger.Warning("(This can be ignored if intentional) The server is still running, what is wrong with you? The poor kitty can't handle two socket servers! ;-;"); + return; + } + + _logger.Information("Starting socket server"); + + ServerCts = new CancellationTokenSource(); + Task.Run(() => + { + try + { + StartListening(ServerCts); + } + catch (Exception e) + { + _logger.Error(e, "Failed to start Kitty socket server"); + } + }); + } + + public void Dispose() + { + + ServerCts!.Dispose(); + } + + public bool isServerRunning() + { + return _isServerRunning; + } + } +} \ No newline at end of file diff --git a/CatCore/Services/Sockets/ClientSocket.cs b/CatCore/Services/Sockets/ClientSocket.cs new file mode 100644 index 0000000..eb8703b --- /dev/null +++ b/CatCore/Services/Sockets/ClientSocket.cs @@ -0,0 +1,207 @@ +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using CatCore.Services.Sockets.Packets; + +namespace CatCore.Services.Sockets +{ + + + // Not much, just represents a client + public class ClientSocket + { + // Client socket. + public readonly Socket WorkSocket; + public readonly Guid Uuid; + + private readonly BlockingCollection _packetsToSend = new(); + private readonly Action _onClose; + private readonly Action _onRead; + private readonly NetworkStream _socketStream; + + private bool _closed; + + // Size of receive buffer. + private const int BUFFER_SIZE = 4096; + private const char DELIMETER = '\n'; // Environment.NewLine; + + public ClientSocket(Socket workSocket, Guid uuid, CancellationTokenSource cts, Action onClose, Action onReceive) + { + Uuid = uuid; + WorkSocket = workSocket; + _socketStream = new NetworkStream(WorkSocket, false); + + // timeout in ms + // todo: configurable + _socketStream.ReadTimeout = 5000; + _socketStream.WriteTimeout = 5000; + + _onClose = onClose; + _onRead = onReceive; + + _ = Task.Run(async () => + { + await SendTaskLoop(cts); + }, cts.Token); + + + _ = Task.Run(async () => + { + await ReceiveTaskLoopStart(cts); + }, cts.Token); + } + + private async Task SendTaskLoop(CancellationTokenSource cts) + { + try + { + while (!cts.IsCancellationRequested && !_closed) + { + // Stop trying to send data + if (!WorkSocket.Connected) + { + Close(); + return; + } + + if (!_packetsToSend.TryTake(out var packet)) + { + await Task.Yield(); + continue; + } + + var bytesToSend = JsonSerializer.SerializeToUtf8Bytes(packet, packet.GetType()); + + await _socketStream.WriteAsync(bytesToSend, 0, bytesToSend.Length, cts.Token); + await _socketStream.FlushAsync(); + } + } + catch (SocketException e) + { + Console.Error.WriteLine(e); + Close(); + } + } + + private async Task ReceiveTaskLoopStart(CancellationTokenSource cts) + { + try + { + // Received data string. + var receivedDataStr = new StringBuilder(); + + void ReadFlush(StringBuilder data) + { + try + { + // All data has been finalized, invoke callback + _onRead(this, data.ToString()); + } + catch (Exception e) + { + Console.Error.WriteLine(e); + } + + // Clear string + data.Clear(); + } + + while (!cts.IsCancellationRequested && !_closed) + { + // Stop trying to receive data + if (!WorkSocket.Connected) + { + Close(); + return; + } + + + // Receive buffer. + // the buffer is used to store the received bytes temporarily + // and cleared when they are later parsed into receivedData + var buffer = new byte[BUFFER_SIZE]; + + + var bytesRead = await _socketStream.ReadAsync(buffer, 0, BUFFER_SIZE, cts.Token); + + // If 0, no more data is coming + if (bytesRead <= 0) + { + ReadFlush(receivedDataStr); + } + else + { + var str = Encoding.UTF8.GetString(buffer, 0, bytesRead); + + // if the string already contains a delimiter, + // split it. This way, multiple strings sent at once can be parsed + if (str.Contains(DELIMETER)) + { + var strings = str.Split(DELIMETER); + + var index = 0; + + foreach (var s in strings) + { + if (index >= strings.Length - 1) + { + break; + } + + receivedDataStr.Append(s); + + ReadFlush(receivedDataStr); + index++; + } + + continue; + } + + // There might be more data, so store the data received so far. + receivedDataStr.Append(str); + + Console.WriteLine(receivedDataStr); + } + } + } + catch (SocketException e) + { + Console.Error.WriteLine(e); + Close(); + } + + Console.WriteLine("Done listening"); + } + + public void QueueSend(Packet packet) + { + if (!WorkSocket.Connected) + { + Close(); + throw new IOException("Socket has been closed!"); + } + + _packetsToSend.Add(packet); + } + + private void Close() + { + if (_closed) + { + return; + } + + _socketStream.Dispose(); + + _onClose.Invoke(this); + _closed = true; + } + } + +} \ No newline at end of file diff --git a/CatCore/Services/Sockets/Packet.cs b/CatCore/Services/Sockets/Packet.cs new file mode 100644 index 0000000..526ee3c --- /dev/null +++ b/CatCore/Services/Sockets/Packet.cs @@ -0,0 +1,96 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; + +namespace CatCore.Services.Sockets.Packets +{ + /// + /// Abstract class to define packet types + /// + public abstract class Packet + { + // Force packet name to be runtime constant + public string PacketName => GetType().Name; + + public static Packet? GetPacketFromJson(string json) + { + if (string.IsNullOrEmpty(json)) + { + return null; + } + + var jsonNode = JsonSerializer.Deserialize>(json); + + if (jsonNode == null) + { + return null; + } + + if (!jsonNode.TryGetValue(nameof(PacketName), out var packetNameElm)) + { + return null; + } + + var packetName = packetNameElm.GetString(); + + if (packetName == null) + { + return null; + } + + var type = GetPacketTypeByName(packetName); + return type == null ? null : JsonSerializer.Deserialize(json, type) as Packet; + } + + public static Packet? TryGetPacketFromJson(string json, out Exception? exception) + { + exception = null; + try + { + return GetPacketFromJson(json); + } + catch (Exception e) + { + exception = e; + return null; + } + } + + private static Type? GetPacketTypeByName(string name) + { + var type = Type.GetType($"{typeof(Packet).Namespace}.{name}"); + + if (type == null || type == typeof(Packet) || !typeof(Packet).IsAssignableFrom(type.BaseType)) + { + return null; + } + + return type; + } + } + + public class GetHello : Packet + { + [JsonConstructor] + private GetHello(string hello) + { + this.Hello = hello; + } + + public string Hello { get; } + } + + public class RespondHello : Packet + { + [JsonConstructor] + public RespondHello(string helloToSend) + { + HelloBack = helloToSend; + } + + [JsonInclude] + public string HelloBack { get; } + } +} \ No newline at end of file