Skip to content

Commit cf61b08

Browse files
committed
Fixed ODE
1 parent 948df24 commit cf61b08

File tree

1 file changed

+37
-43
lines changed

1 file changed

+37
-43
lines changed

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/OutputMultiplexer.cs

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,52 +27,51 @@ public MultiplexedStreamFactory Factory
2727
init => factory = value;
2828
}
2929

30-
public Task ProcessAsync(Socket socket)
30+
public async Task ProcessAsync(Socket socket)
3131
{
32-
var task = ProcessCoreAsync(socket);
33-
34-
// if output multiplexer is completed due to exception, we need to trigger
35-
// the input multiplexer to handle the error
36-
task.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted(TransportSignal.SetNoResult);
37-
return task;
38-
}
39-
40-
private async Task ProcessCoreAsync(Socket socket)
41-
{
42-
FrameHeader header;
43-
for (var bufferedBytes = 0;; AdjustFramingBuffer(ref bufferedBytes, header, framingBuffer.Span))
32+
try
4433
{
45-
StartOperation(Timeout); // resumed by heartbeat
46-
try
34+
FrameHeader header;
35+
for (var bufferedBytes = 0;; AdjustFramingBuffer(ref bufferedBytes, header, framingBuffer.Span))
4736
{
48-
// read at least header
49-
while (bufferedBytes < FrameHeader.Size)
37+
StartOperation(Timeout); // resumed by heartbeat
38+
try
5039
{
51-
bufferedBytes += await socket.ReceiveAsync(framingBuffer.Slice(bufferedBytes), TimeBoundedToken).ConfigureAwait(false);
40+
// read at least header
41+
while (bufferedBytes < FrameHeader.Size)
42+
{
43+
bufferedBytes += await socket.ReceiveAsync(framingBuffer.Slice(bufferedBytes), TimeBoundedToken).ConfigureAwait(false);
44+
}
45+
46+
header = FrameHeader.Parse(framingBuffer.Span);
47+
48+
// read the fragment
49+
while (bufferedBytes < header.Length + FrameHeader.Size)
50+
{
51+
bufferedBytes += await socket.ReceiveAsync(framingBuffer.Slice(bufferedBytes), TimeBoundedToken).ConfigureAwait(false);
52+
}
5253
}
53-
54-
header = FrameHeader.Parse(framingBuffer.Span);
55-
56-
// read the fragment
57-
while (bufferedBytes < header.Length + FrameHeader.Size)
54+
catch (OperationCanceledException e) when (IsOperationCanceled(e))
5855
{
59-
bufferedBytes += await socket.ReceiveAsync(framingBuffer.Slice(bufferedBytes), TimeBoundedToken).ConfigureAwait(false);
56+
throw new OperationCanceledException(ExceptionMessages.ConnectionClosed, e, RootToken);
57+
}
58+
catch (OperationCanceledException e) when (IsOperationTimedOut(e))
59+
{
60+
throw new TimeoutException(ExceptionMessages.ConnectionTimedOut, e);
61+
}
62+
finally
63+
{
64+
await ResetOperationTimeoutAsync().ConfigureAwait(false);
6065
}
61-
}
62-
catch (OperationCanceledException e) when (IsOperationCanceled(e))
63-
{
64-
throw new OperationCanceledException(ExceptionMessages.ConnectionClosed, e, RootToken);
65-
}
66-
catch (OperationCanceledException e) when (IsOperationTimedOut(e))
67-
{
68-
throw new TimeoutException(ExceptionMessages.ConnectionTimedOut, e);
69-
}
70-
finally
71-
{
72-
await ResetOperationTimeoutAsync().ConfigureAwait(false);
73-
}
7466

75-
await ReadFrameAsync(header).ConfigureAwait(false);
67+
await ReadFrameAsync(header).ConfigureAwait(false);
68+
}
69+
}
70+
finally
71+
{
72+
// if output multiplexer is completed due to exception, we need to trigger
73+
// the input multiplexer to handle the error
74+
TransportSignal.Set();
7675
}
7776
}
7877

@@ -113,9 +112,4 @@ private static void AdjustFramingBuffer(ref int bufferedBytes, FrameHeader heade
113112
.Slice(bytesWritten, bufferedBytes -= bytesWritten)
114113
.CopyTo(framingBuffer);
115114
}
116-
}
117-
118-
file static class AsyncAutoResetEventExtensions
119-
{
120-
public static void SetNoResult(this AsyncAutoResetEvent resetEvent) => resetEvent.Set();
121115
}

0 commit comments

Comments
 (0)