Skip to content

Commit ce34dce

Browse files
committed
[+] Adapt code from AquaMai
1 parent 28b9d79 commit ce34dce

File tree

5 files changed

+1502
-0
lines changed

5 files changed

+1502
-0
lines changed

mod/WorldLink/FutariClient.cs

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Diagnostics;
4+
using System.IO;
5+
using System.Linq;
6+
using System.Net;
7+
using System.Net.Sockets;
8+
using System.Text;
9+
using HarmonyLib;
10+
#if !UNITY_2018_1_OR_NEWER
11+
using System.Threading;
12+
using AMDaemon;
13+
using PartyLink;
14+
using static Manager.Accounting;
15+
#endif
16+
17+
public class FutariClient
18+
{
19+
public static string LOBBY_BASE => AquaMai.ReadString("Mods.WorldLink.LobbyUrl");
20+
// public const string LOBBY_BASE = "https://aquadx.net/aqua/mai2-futari";
21+
public static FutariClient Instance { get; private set; }
22+
23+
public FutariClient(string keychip, string host, int port, int _)
24+
{
25+
this.host = host;
26+
this.port = port;
27+
this.keychip = keychip;
28+
}
29+
30+
public FutariClient(string keychip, string host, int port) : this(keychip, host, port, 0)
31+
{
32+
Instance = this;
33+
}
34+
35+
public string keychip { get; set; }
36+
37+
private TcpClient _tcpClient;
38+
private StreamWriter _writer;
39+
private StreamReader _reader;
40+
41+
public readonly ConcurrentQueue<FutariMsg> sendQ = new();
42+
// <Port + Stream ID, Message Queue>
43+
public readonly ConcurrentDictionary<int, ConcurrentQueue<FutariMsg>> tcpRecvQ = new();
44+
// <Port, Message Queue>
45+
public readonly ConcurrentDictionary<int, ConcurrentQueue<FutariMsg>> udpRecvQ = new();
46+
// <Port, Accept Queue>
47+
public readonly ConcurrentDictionary<int, ConcurrentQueue<FutariMsg>> acceptQ = new();
48+
// <Port + Stream ID, Callback>
49+
public readonly ConcurrentDictionary<int, Action<FutariMsg>> acceptCallbacks = new();
50+
51+
private System.Threading.Thread _sendThread;
52+
private System.Threading.Thread _recvThread;
53+
54+
private bool _reconnecting = false;
55+
56+
private readonly Stopwatch _heartbeat = new Stopwatch().Also(it => it.Start());
57+
private readonly long[] _delayWindow = new int[20].Select(_ => -1L).ToArray();
58+
public int _delayIndex = 0;
59+
public long _delayAvg = 0;
60+
private readonly string host;
61+
private readonly int port;
62+
63+
public IPAddress StubIP => FutariExt.KeychipToStubIp(keychip).ToIP();
64+
65+
/// <summary>
66+
/// -1: Failed to connect
67+
/// 0: Not connect
68+
/// 1: Connecting
69+
/// 2: Connected
70+
/// </summary>
71+
public int StatusCode { get; private set; } = 0;
72+
public string ErrorMsg { get; private set; } = "";
73+
74+
public void ConnectAsync() => new System.Threading.Thread(Connect) { IsBackground = true }.Start();
75+
76+
private void Connect()
77+
{
78+
_tcpClient = new TcpClient();
79+
80+
try
81+
{
82+
StatusCode = 1;
83+
_tcpClient.Connect(host, port);
84+
StatusCode = 2;
85+
}
86+
catch (Exception ex)
87+
{
88+
StatusCode = -1;
89+
ErrorMsg = ex.Message;
90+
Log.Error($"Error connecting to server:\nHost:{host}:{port}\n{ex.Message}");
91+
ConnectAsync();
92+
return;
93+
}
94+
var networkStream = _tcpClient.GetStream();
95+
_writer = new StreamWriter(networkStream, Encoding.UTF8) { AutoFlush = true };
96+
_reader = new StreamReader(networkStream, Encoding.UTF8);
97+
_reconnecting = false;
98+
99+
// Register
100+
Send(new FutariMsg { FutariCmd = FutariCmd.CTL_START, data = keychip });
101+
Log.Info($"Connected to server at {host}:{port}");
102+
103+
// Start communication and message receiving in separate threads
104+
_sendThread = 10.Interval(() =>
105+
{
106+
if (_heartbeat.ElapsedMilliseconds > 1000)
107+
{
108+
_heartbeat.Restart();
109+
Send(new FutariMsg { FutariCmd = FutariCmd.CTL_HEARTBEAT });
110+
}
111+
112+
// Send any data in the send queue
113+
while (sendQ.TryDequeue(out var msg)) Send(msg);
114+
115+
}, final: Reconnect, name: "SendThread", stopOnError: true);
116+
117+
_recvThread = 10.Interval(() =>
118+
{
119+
var line = _reader.ReadLine();
120+
if (line == null) return;
121+
122+
var message = FutariMsg.FromString(line);
123+
HandleIncomingMessage(message);
124+
125+
}, final: Reconnect, name: "RecvThread", stopOnError: true);
126+
}
127+
128+
public void Bind(int bindPort, ProtocolType proto)
129+
{
130+
if (proto == ProtocolType.Tcp)
131+
acceptQ.TryAdd(bindPort, new ConcurrentQueue<FutariMsg>());
132+
else if (proto == ProtocolType.Udp)
133+
udpRecvQ.TryAdd(bindPort, new ConcurrentQueue<FutariMsg>());
134+
}
135+
136+
private void Reconnect()
137+
{
138+
Log.Warn("Reconnect Entered");
139+
if (_reconnecting) return;
140+
_reconnecting = true;
141+
142+
try { _tcpClient.Close(); }
143+
catch { /* ignored */ }
144+
145+
try { _sendThread.Abort(); }
146+
catch { /* ignored */ }
147+
148+
try { _recvThread.Abort(); }
149+
catch { /* ignored */ }
150+
151+
_sendThread = null;
152+
_recvThread = null;
153+
_tcpClient = null;
154+
155+
// Reconnect
156+
Log.Warn("Reconnecting...");
157+
ConnectAsync();
158+
}
159+
160+
private void HandleIncomingMessage(FutariMsg futariMsg)
161+
{
162+
if (futariMsg.FutariCmd != FutariCmd.CTL_HEARTBEAT)
163+
Log.Info($"{StubIP} <<< {futariMsg.ToReadableString()}");
164+
165+
switch (futariMsg.FutariCmd)
166+
{
167+
// Heartbeat
168+
case FutariCmd.CTL_HEARTBEAT:
169+
var delay = _heartbeat.ElapsedMilliseconds;
170+
_delayWindow[_delayIndex] = delay;
171+
_delayIndex = (_delayIndex + 1) % _delayWindow.Length;
172+
_delayAvg = (long) _delayWindow.Where(x => x != -1).Average();
173+
Log.Info($"Heartbeat: {delay}ms, Avg: {_delayAvg}ms");
174+
break;
175+
176+
// UDP message
177+
case FutariCmd.DATA_SEND or FutariCmd.DATA_BROADCAST when futariMsg is { proto: ProtocolType.Udp, dPort: not null }:
178+
udpRecvQ.Get(futariMsg.dPort.Value)?.Also(q =>
179+
{
180+
Log.Info($"+ Added to UDP queue, there are {q.Count + 1} messages in queue");
181+
})?.Enqueue(futariMsg);
182+
break;
183+
184+
// TCP message
185+
case FutariCmd.DATA_SEND when futariMsg.proto == ProtocolType.Tcp && futariMsg is { sid: not null, dPort: not null }:
186+
tcpRecvQ.Get(futariMsg.sid.Value + futariMsg.dPort.Value)?.Also(q =>
187+
{
188+
Log.Info($"+ Added to TCP queue, there are {q.Count + 1} messages in queue for port {futariMsg.dPort}");
189+
})?.Enqueue(futariMsg);
190+
break;
191+
192+
// TCP connection request
193+
case FutariCmd.CTL_TCP_CONNECT when futariMsg.dPort != null:
194+
acceptQ.Get(futariMsg.dPort.Value)?.Also(q =>
195+
{
196+
Log.Info($"+ Added to Accept queue, there are {q.Count + 1} messages in queue");
197+
})?.Enqueue(futariMsg);
198+
break;
199+
200+
// TCP connection accept
201+
case FutariCmd.CTL_TCP_ACCEPT when futariMsg is { sid: not null, dPort: not null }:
202+
acceptCallbacks.Get(futariMsg.sid.Value + futariMsg.dPort.Value)?.Invoke(futariMsg);
203+
break;
204+
}
205+
}
206+
207+
private void Send(FutariMsg futariMsg)
208+
{
209+
// Check if msg's destination ip is the same as my local ip. If so, handle it locally
210+
if (futariMsg.dst == StubIP.ToU32())
211+
{
212+
Log.Debug($"Loopback @@@ {futariMsg.ToReadableString()}");
213+
HandleIncomingMessage(futariMsg);
214+
return;
215+
}
216+
217+
_writer.WriteLine(futariMsg);
218+
if (futariMsg.FutariCmd != FutariCmd.CTL_HEARTBEAT)
219+
Log.Info($"{StubIP} >>> {futariMsg.ToReadableString()}");
220+
}
221+
}

mod/WorldLink/FutariExt.cs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#nullable enable
2+
using System;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Net;
7+
using System.Security.Cryptography;
8+
using System.Text;
9+
using System.Threading;
10+
using Manager.Party.Party;
11+
using PartyLink;
12+
using HarmonyLib;
13+
14+
public static class FutariExt
15+
{
16+
private static uint HashStringToUInt(string input)
17+
{
18+
using var md5 = MD5.Create();
19+
var hashBytes = md5.ComputeHash(Encoding.UTF8.GetBytes(input));
20+
return ((uint)(hashBytes[0] & 0xFF) << 24) |
21+
((uint)(hashBytes[1] & 0xFF) << 16) |
22+
((uint)(hashBytes[2] & 0xFF) << 8) |
23+
((uint)(hashBytes[3] & 0xFF));
24+
}
25+
26+
public static uint KeychipToStubIp(string keychip) => HashStringToUInt(keychip);
27+
28+
public static IPAddress ToIP(this uint val) => new(new IpAddress(val).GetAddressBytes());
29+
public static uint ToU32(this IPAddress ip) => ip.ToNetworkByteOrderU32();
30+
31+
public static void Do<T>(this T x, Action<T> f) => f(x);
32+
public static R Let<T, R>(this T x, Func<T, R> f) => f(x);
33+
public static T Also<T>(this T x, Action<T> f) { f(x); return x; }
34+
35+
public static List<T> Each<T>(this IEnumerable<T> enu, Action<T> f) =>
36+
enu.ToList().Also(x => x.ForEach(f));
37+
38+
public static byte[] View(this byte[] buffer, int offset, int size)
39+
{
40+
var array = new byte[size];
41+
Array.Copy(buffer, offset, array, 0, size);
42+
return array;
43+
}
44+
45+
public static string B64(this byte[] buffer) => Convert.ToBase64String(buffer);
46+
public static byte[] B64(this string str) => Convert.FromBase64String(str);
47+
48+
public static V? Get<K, V>(this ConcurrentDictionary<K, V> dict, K key) where V : class
49+
{
50+
return dict.GetValueOrDefault(key);
51+
}
52+
53+
// Call a function using reflection
54+
public static void Call(this object obj, string method, params object[] args)
55+
{
56+
obj.GetType().GetMethod(method)?.Invoke(obj, args);
57+
}
58+
59+
public static uint MyStubIP() => KeychipToStubIp(FutariClient.Instance.keychip);
60+
61+
public static string Post(this string url, string body)
62+
{
63+
using var web = new WebClient();
64+
web.Encoding = Encoding.UTF8;
65+
web.Headers.Add("Content-Type", "application/json");
66+
return web.UploadString(new Uri(url), body);
67+
}
68+
69+
public static void PostAsync(this string url, string body, UploadStringCompletedEventHandler? callback = null)
70+
{
71+
using var web = new WebClient();
72+
if (callback != null) web.UploadStringCompleted += callback;
73+
web.Encoding = Encoding.UTF8;
74+
web.Headers.Add("Content-Type", "application/json");
75+
web.UploadStringAsync(new Uri(url), body);
76+
}
77+
78+
public static string Get(this string url)
79+
{
80+
using var web = new WebClient();
81+
web.Encoding = Encoding.UTF8;
82+
return web.DownloadString(new Uri(url));
83+
}
84+
85+
public static void GetAsync(this string url, DownloadStringCompletedEventHandler? callback = null)
86+
{
87+
using var web = new WebClient();
88+
if (callback != null) web.DownloadStringCompleted += callback;
89+
web.Encoding = Encoding.UTF8;
90+
web.DownloadStringAsync(new Uri(url));
91+
}
92+
93+
public static System.Threading.Thread Interval(
94+
this int delay, Action action, bool stopOnError = false,
95+
Action<Exception>? error = null, Action? final = null, string? name = null
96+
) => new System.Threading.Thread(() =>
97+
{
98+
name ??= $"Interval {System.Threading.Thread.CurrentThread.ManagedThreadId} for {action}";
99+
try
100+
{
101+
while (!Futari.stopping)
102+
{
103+
try
104+
{
105+
System.Threading.Thread.Sleep(delay);
106+
action();
107+
}
108+
catch (ThreadInterruptedException)
109+
{
110+
break;
111+
}
112+
catch (Exception e)
113+
{
114+
if (stopOnError) throw;
115+
Log.Error($"Error in {name}: {e}");
116+
}
117+
}
118+
}
119+
catch (Exception e)
120+
{
121+
Log.Error($"Fatal error in {name}: {e}");
122+
error?.Invoke(e);
123+
}
124+
finally
125+
{
126+
Log.Warn($"{name} stopped");
127+
final?.Invoke();
128+
}
129+
}).Also(x => x.Start());
130+
131+
}

0 commit comments

Comments
 (0)