Skip to content

Commit adbe7c8

Browse files
committed
Code cleanup
1 parent fec898e commit adbe7c8

File tree

1 file changed

+31
-30
lines changed

1 file changed

+31
-30
lines changed

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/OutputMultiplexer.cs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -72,40 +72,41 @@ private async Task ProcessCoreAsync(Socket socket)
7272
await ResetOperationTimeoutAsync().ConfigureAwait(false);
7373
}
7474

75-
MultiplexedStream? stream;
76-
if (header.Control is FrameControl.Heartbeat)
77-
{
78-
continue;
79-
}
80-
else if (Streams.TryGetValue(header.Id, out stream))
81-
{
82-
if (stream.IsTransportOutputCompleted)
83-
continue;
84-
}
85-
else if (factory is null || header.CanBeIgnored)
86-
{
87-
continue;
88-
}
89-
else if ((stream = factory(TransportSignal, in MeasurementTags)) is null)
90-
{
91-
Commands.TryAdd(new StreamRejectedCommand(header.Id));
92-
TransportSignal.Set();
93-
continue;
94-
}
95-
else
96-
{
97-
Streams[header.Id] = stream;
98-
ChangeStreamCount();
99-
}
75+
await ReadFrameAsync(header).ConfigureAwait(false);
76+
}
77+
}
78+
79+
private ValueTask ReadFrameAsync(FrameHeader header)
80+
=> GetOrCreateStream(header) is { } stream
81+
? stream.ReadFrameAsync(header.Control, framingBuffer.Slice(FrameHeader.Size, header.Length), RootToken)
82+
: ValueTask.CompletedTask;
83+
84+
private MultiplexedStream? GetOrCreateStream(FrameHeader header)
85+
{
86+
if (header.Control is FrameControl.Heartbeat)
87+
return null;
88+
89+
if (Streams.TryGetValue(header.Id, out var stream))
90+
return stream.IsTransportOutputCompleted ? null : stream;
10091

101-
// write the frame to the output header
102-
await stream
103-
.ReadFrameAsync(header.Control, framingBuffer.Slice(FrameHeader.Size, header.Length), RootToken)
104-
.ConfigureAwait(false);
92+
if (factory is null || header.CanBeIgnored)
93+
return null;
94+
95+
if ((stream = factory(TransportSignal, in MeasurementTags)) is null)
96+
{
97+
Commands.TryAdd(new StreamRejectedCommand(header.Id));
98+
TransportSignal.Set();
10599
}
100+
else
101+
{
102+
Streams[header.Id] = stream;
103+
ChangeStreamCount();
104+
}
105+
106+
return stream;
106107
}
107108

108-
private static void AdjustFramingBuffer(ref int bufferedBytes, in FrameHeader header, Span<byte> framingBuffer)
109+
private static void AdjustFramingBuffer(ref int bufferedBytes, FrameHeader header, Span<byte> framingBuffer)
109110
{
110111
var bytesWritten = header.Length + FrameHeader.Size;
111112
framingBuffer

0 commit comments

Comments
 (0)