Skip to content

Commit 7e5e4ec

Browse files
committed
cleaning
1 parent a96ea6d commit 7e5e4ec

32 files changed

+532
-408
lines changed

Benchmarks/RelayBenchmark/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ private static void RelayTest()
200200
Console.ReadLine();
201201
//Task.Run(async () => { while (true) { await Task.Delay(10000); server.GetTcpStatistics(out var generalStats, out _); Console.WriteLine(generalStats.ToString()); } });
202202
var clients = new List<RelayClient>();
203-
int numclients = 100;
203+
int numclients = 50;
204204
var pending = new Task[numclients];
205205
Task.Run(async () => { while (true) { await Task.Delay(1000); Console.WriteLine(Interlocked.Exchange(ref sumsum, 0).ToString("N0")); } });
206206

NetworkLibrary/Components/MessageBuffer/MessageBuffer.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class MessageBuffer : IMessageQueue
1212

1313
protected PooledMemoryStream writeStream = new PooledMemoryStream();
1414
protected PooledMemoryStream flushStream = new PooledMemoryStream();
15-
protected readonly object loki = new object();
15+
protected readonly object bufferMtex = new object();
1616
protected bool writeLengthPrefix;
1717
protected int currentIndexedMemory;
1818
protected bool disposedValue;
@@ -30,7 +30,7 @@ public bool IsEmpty()
3030

3131
public bool TryEnqueueMessage(byte[] bytes)
3232
{
33-
lock (loki)
33+
lock (bufferMtex)
3434
{
3535
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
3636
{
@@ -54,7 +54,7 @@ public bool TryEnqueueMessage(byte[] bytes)
5454
}
5555
public bool TryEnqueueMessage(byte[] bytes, int offset, int count)
5656
{
57-
lock (loki)
57+
lock (bufferMtex)
5858
{
5959
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
6060
{
@@ -77,7 +77,7 @@ public bool TryEnqueueMessage(byte[] bytes, int offset, int count)
7777
}
7878
public bool TryFlushQueue(ref byte[] buffer, int offset, out int amountWritten)
7979
{
80-
lock (loki)
80+
lock (bufferMtex)
8181
{
8282
if (IsEmpty())
8383
{
@@ -102,7 +102,7 @@ public bool TryFlushQueue(ref byte[] buffer, int offset, out int amountWritten)
102102
}
103103
public bool TryEnqueueMessage(byte[] data1, int offset1, int count1, byte[] data2, int offset2, int count2)
104104
{
105-
lock (loki)
105+
lock (bufferMtex)
106106
{
107107
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
108108
{
@@ -126,7 +126,7 @@ public bool TryEnqueueMessage(byte[] data1, int offset1, int count1, byte[] data
126126
}
127127
protected virtual void Dispose(bool disposing)
128128
{
129-
lock (loki)
129+
lock (bufferMtex)
130130
{
131131
if (!disposedValue)
132132
{

NetworkLibrary/Components/MessageProcessor/Unmanaged/UnsafeDelimitedMessageWriter.cs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ public unsafe bool ProcessMessage(byte[] message)
3737
{
3838
fixed (byte* b = &bufferInternal[offset])
3939
*(int*)b = message.Length;
40-
//bufferInternal[offset] = (byte)message.Length;
41-
//bufferInternal[offset + 1] = (byte)(message.Length >> 8);
42-
//bufferInternal[offset + 2] = (byte)(message.Length >> 16);
43-
//bufferInternal[offset + 3] = (byte)(message.Length >> 24);
40+
4441
offset += 4;
4542
count += 4;
4643
}
@@ -57,7 +54,6 @@ public unsafe bool ProcessMessage(byte[] message)
5754

5855
if (bufferInternal.Length - offset >= message.Length)
5956
{
60-
//System.Buffer.BlockCopy(message, 0, bufferInternal, offset, message.Length);
6157
fixed (byte* destination = &bufferInternal[offset])
6258
{
6359
fixed (byte* message_ = message)
@@ -93,7 +89,6 @@ public unsafe bool Flush()
9389
}
9490
if (pendingRemaining <= bufferInternal.Length - offset)
9591
{
96-
//System.Buffer.BlockCopy(pendingMessage, pendingMessageOffset, bufferInternal, offset, pendingRemaining);
9792
fixed (byte* destination = &bufferInternal[offset])
9893
{
9994
fixed (byte* message_ = &pendingMessage[pendingMessageOffset])
@@ -110,7 +105,6 @@ public unsafe bool Flush()
110105
}
111106
else
112107
{
113-
//System.Buffer.BlockCopy(pendingMessage, pendingMessageOffset, bufferInternal, offset, bufferInternal.Length - offset);
114108
fixed (byte* destination = &bufferInternal[offset])
115109
{
116110
fixed (byte* message_ = &pendingMessage[pendingMessageOffset])

NetworkLibrary/MessageProtocol/Fast/GenericMessageQueue.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public GenericMessageQueue(int maxIndexedMemory, bool writeLengthPrefix = true)
1717

1818
public bool TryEnqueueMessage<T>(MessageEnvelope envelope, T message)
1919
{
20-
lock (loki)
20+
lock (bufferMtex)
2121
{
2222
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
2323
{
@@ -45,7 +45,7 @@ public bool TryEnqueueMessage<T>(MessageEnvelope envelope, T message)
4545

4646
public bool TryEnqueueMessage(MessageEnvelope envelope, Action<PooledMemoryStream> serializationCallback)
4747
{
48-
lock (loki)
48+
lock (bufferMtex)
4949
{
5050
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
5151
{
@@ -74,7 +74,7 @@ public bool TryEnqueueMessage(MessageEnvelope envelope, Action<PooledMemoryStrea
7474
public bool TryEnqueueMessage(MessageEnvelope envelope)
7575
{
7676

77-
lock (loki)
77+
lock (bufferMtex)
7878
{
7979
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
8080
{

NetworkLibrary/MessageProtocol/Fast/Network/SecureMessageSession.cs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,7 @@ private void SendAsyncInternal(MessageEnvelope message)
8585
EndSession();
8686
return;
8787
}
88-
ThreadPool.UnsafeQueueUserWorkItem((s) =>
89-
{
90-
mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
91-
WriteOnSessionStream(amountWritten);
92-
}, null);
88+
FlushAndSend();
9389
}
9490

9591
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -138,11 +134,7 @@ private void SendAsyncInternal<T>(MessageEnvelope envelope, T message)
138134

139135
//mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
140136
//WriteOnSessionStream(amountWritten);
141-
ThreadPool.UnsafeQueueUserWorkItem((s) =>
142-
{
143-
mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
144-
WriteOnSessionStream(amountWritten);
145-
}, null);
137+
FlushAndSend();
146138

147139
}
148140

@@ -189,11 +181,7 @@ private void SendAsyncInternal(MessageEnvelope envelope, Action<PooledMemoryStre
189181
EndSession();
190182
return;
191183
}
192-
ThreadPool.UnsafeQueueUserWorkItem((s) =>
193-
{
194-
mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
195-
WriteOnSessionStream(amountWritten);
196-
}, null);
184+
FlushAndSend();
197185
//mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
198186
//WriteOnSessionStream(amountWritten);
199187

NetworkLibrary/MessageProtocol/Generic/Components/GenericMessageQueue.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public GenericMessageQueue(int maxIndexedMemory, bool writeLengthPrefix = true)
1818

1919
public bool TryEnqueueMessage<T>(E envelope, T message)
2020
{
21-
lock (loki)
21+
lock (bufferMtex)
2222
{
2323
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
2424
{
@@ -47,7 +47,7 @@ public bool TryEnqueueMessage<T>(E envelope, T message)
4747
public bool TryEnqueueMessage(E envelope)
4848
{
4949

50-
lock (loki)
50+
lock (bufferMtex)
5151
{
5252
if (currentIndexedMemory < MaxIndexedMemory && !disposedValue)
5353
{

NetworkLibrary/MessageProtocol/Generic/SecureMessageSession.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ private void SendAsyncInternal(E message)
8080

8181
// you have to push it to queue because queue also does the processing.
8282
mq.TryEnqueueMessage(message);
83-
mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
84-
WriteOnSessionStream(amountWritten);
83+
FlushAndSend();
8584

8685
}
8786

@@ -123,8 +122,7 @@ private void SendAsyncInternal<T>(E envelope, T message)
123122

124123
// you have to push it to queue because queue also does the processing.
125124
mq.TryEnqueueMessage(envelope, message);
126-
mq.TryFlushQueue(ref sendBuffer, 0, out int amountWritten);
127-
WriteOnSessionStream(amountWritten);
125+
FlushAndSend();
128126

129127
}
130128
protected override void ReleaseReceiveResources()

NetworkLibrary/P2P/Components/StateManagemet/Client/ClientTcpHolepunchState.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ public ClientTcpHolepunchState(StateManager stateManager, IPEndPoint relayUdpEp,
6262
this.StateId = stateId;
6363
this.stateManager = stateManager;
6464
this.relayUdpEp = relayUdpEp;
65-
//Runt();
65+
//DebugSts();
6666
}
6767

68-
private async void Runt()
68+
private async void DebugSts()
6969
{
7070
await Task.Delay(5000);
7171

@@ -333,7 +333,7 @@ private void Accepted(Guid guid)
333333

334334
GenericMessageSerializer<MockSerializer> serializer = new GenericMessageSerializer<MockSerializer>();
335335
PooledMemoryStream stream = new PooledMemoryStream();
336-
AsyncUdpClient cl = new AsyncUdpClient();
336+
AsyncUdpClient cl = new AsyncUdpClient();
337337
private static object globalLock = new object();
338338
private void SendUdpPortMapMsg()
339339
{

NetworkLibrary/P2P/Components/StateManagemet/Server/ServerTcpHolepunchState.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ public ServerTcpHolepunchState(StateManager sm, Guid stateId)
4242
}
4343
public async void Initialize(MessageEnvelope message)
4444
{
45-
MiniLogger.Log(MiniLogger.LogLevel.Debug, "Initialising Server HP State");
4645
requesterEndpoints = KnownTypeSerializer.DeserializeEndpointTransferMessage(message.Payload, message.PayloadOffset);
4746
requesterId = message.From;
4847
destinationId = message.To;
48+
MiniLogger.Log(MiniLogger.LogLevel.Info, $"Initialising Server HP State between: {requesterId} -> {destinationId}");
4949

5050
MessageEnvelope msg = new MessageEnvelope
5151
{
@@ -70,7 +70,7 @@ public async void Initialize(MessageEnvelope message)
7070

7171
while (!portMapComplete && released == 0)
7272
{
73-
MiniLogger.Log(MiniLogger.LogLevel.Warning, "awaiting port map");
73+
MiniLogger.Log(MiniLogger.LogLevel.Debug, "awaiting port map");
7474
if(DestinationRemoteEp == null)
7575
{
7676
var msg1 = new MessageEnvelope()
@@ -198,7 +198,7 @@ private void HandleSuccess(MessageEnvelope message)
198198
}
199199
if(Interlocked.CompareExchange(ref requesterSuccess,0,0) == 1 && Interlocked.CompareExchange(ref destinationSuccess, 0, 0) == 1)
200200
{
201-
201+
MiniLogger.Log(MiniLogger.LogLevel.Info, $"Succesfully punched TCP hole between: {requesterId} -> {destinationId}");
202202
var msg = new MessageEnvelope()
203203
{
204204
MessageId = StateId,
@@ -294,6 +294,11 @@ public void Release(bool isCompletedSuccessfully)
294294
{
295295
Status = StateStatus.Completed;
296296
}
297+
else
298+
{
299+
MiniLogger.Log(MiniLogger.LogLevel.Info, $"Failed to punch TCP hole between: {requesterId} -> {destinationId}");
300+
301+
}
297302
Completed?.Invoke(this);
298303
Completed = null;
299304
}

NetworkLibrary/P2P/Generic/RelayServerBase.cs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,6 @@ private void InitiateHolepunchBetweenPeers(MessageEnvelope message)
283283
Header = Constants.InitiateHolepunch
284284
};
285285

286-
//if (endpointsD.Last().ToIpEndpoint().Address == ((IPEndPoint)udpServer.LocalEndpoint).Address)
287-
// mreq.KeyValuePairs = new Dictionary<string, string>() { { "UseRelayIp", null } };
288-
289286
SendAsyncMessage(message.From,mreq,
290287
(stream) => KnownTypeSerializer.SerializeEndpointTransferMessage(stream,
291288
new EndpointTransferMessage()
@@ -305,9 +302,7 @@ private void InitiateHolepunchBetweenPeers(MessageEnvelope message)
305302
Header = Constants.InitiateHolepunch
306303
};
307304

308-
//if (endpointsR.Last().ToIpEndpoint().Address == ((IPEndPoint)udpServer.LocalEndpoint).Address)
309-
// mreq.KeyValuePairs = new Dictionary<string, string>() { { "UseRelayIp", null } };
310-
305+
311306
SendAsyncMessage(message.To, mdest,
312307
(stream) => KnownTypeSerializer.SerializeEndpointTransferMessage(stream,
313308
new EndpointTransferMessage() {
@@ -483,11 +478,7 @@ private void HandleUnregistreredMessage(IPEndPoint adress, byte[] bytes, int off
483478
return;
484479

485480
}
486-
//if (count % 16 != 0)
487-
//{
488-
// MiniLogger.Log(MiniLogger.LogLevel.Error, "Udp Relay failed to decrypt unregistered peer message");
489-
// return;
490-
//}
481+
491482
try
492483
{
493484
byte[] result = relayDectriptor.Decrypt(bytes, offset, count);

0 commit comments

Comments
 (0)