Skip to content

Commit 9f7e4f6

Browse files
committed
Use required members to improve the code clarity
1 parent 609bb4f commit 9f7e4f6

File tree

6 files changed

+110
-81
lines changed

6 files changed

+110
-81
lines changed

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/InputMultiplexer.cs

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Collections.Concurrent;
2-
using System.Diagnostics;
32
using System.Net.Sockets;
43
using System.Runtime.CompilerServices;
54

@@ -8,50 +7,61 @@ namespace DotNext.Net.Multiplexing;
87
using Buffers;
98
using Threading;
109

11-
internal sealed class InputMultiplexer<T>(
12-
ConcurrentDictionary<uint, MultiplexedStream> streams,
13-
AsyncAutoResetEvent writeSignal,
14-
BufferWriter<byte> framingBuffer,
15-
int flushThreshold,
16-
in TagList measurementTags,
17-
TimeSpan timeout,
18-
TimeSpan heartbeatTimeout,
19-
CancellationToken token) : Multiplexer<T>(streams, new ConcurrentQueue<ProtocolCommand>(), measurementTags, token)
10+
internal sealed class InputMultiplexer<T>() : Multiplexer<T>(new(), new ConcurrentQueue<ProtocolCommand>())
2011
where T : IStreamMetrics
2112
{
22-
23-
public TimeSpan Timeout => timeout;
13+
public required TimeSpan Timeout { get; init; }
14+
15+
public required TimeSpan HeartbeatTimeout { private get; init; }
16+
17+
public required int FlushThreshold { private get; init; }
18+
19+
public required BufferWriter<byte> FramingBuffer { private get; init; }
20+
21+
public required AsyncAutoResetEvent TransportSignal { private get; init; }
2422

2523
public bool TryAddStream(uint streamId, MultiplexedStream stream)
2624
{
27-
var result = streams.TryAdd(streamId, stream);
25+
var result = Streams.TryAdd(streamId, stream);
2826
ChangeStreamCount(Unsafe.BitCast<bool, byte>(result));
2927
return result;
3028
}
3129

3230
public bool TryRemoveStream(uint streamId, MultiplexedStream stream)
3331
{
34-
var removed = streams.TryRemove(new(streamId, stream));
32+
var removed = Streams.TryRemove(new(streamId, stream));
3533
ChangeStreamCount(-Unsafe.BitCast<bool, byte>(removed));
3634
return removed;
3735
}
3836

39-
public OutputMultiplexer<T> CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout)
40-
=> new(streams, writeSignal, commands, framingBuffer, measurementTags, receiveTimeout, RootToken);
37+
public OutputMultiplexer<T> CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout) => new(Streams, Commands)
38+
{
39+
MeasurementTags = MeasurementTags,
40+
RootToken = RootToken,
41+
FramingBuffer = framingBuffer,
42+
Timeout = receiveTimeout,
43+
TransportSignal = TransportSignal,
44+
};
4145

4246
public OutputMultiplexer<T> CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout, MultiplexedStreamFactory handlerFactory,
43-
CancellationToken token)
44-
=> new(streams, writeSignal, commands, framingBuffer, measurementTags, receiveTimeout, token)
45-
{ Factory = handlerFactory };
47+
CancellationToken token) => new(Streams, Commands)
48+
{
49+
MeasurementTags = MeasurementTags,
50+
RootToken = token,
51+
FramingBuffer = framingBuffer,
52+
Timeout = receiveTimeout,
53+
TransportSignal = TransportSignal,
54+
Factory = handlerFactory,
55+
};
4656

4757
public async Task ProcessAsync(Func<bool> condition, Socket socket)
4858
{
49-
using var enumerator = streams.GetEnumerator();
59+
using var enumerator = Streams.GetEnumerator();
5060
for (var requiresHeartbeat = false;
5161
condition();
52-
requiresHeartbeat = !await writeSignal.WaitAsync(heartbeatTimeout, RootToken).ConfigureAwait(false))
62+
requiresHeartbeat = !await TransportSignal.WaitAsync(HeartbeatTimeout, RootToken).ConfigureAwait(false))
5363
{
54-
framingBuffer.Clear(reuseBuffer: true);
64+
FramingBuffer.Clear(reuseBuffer: true);
5565

5666
// combine streams
5767
while (enumerator.MoveNext())
@@ -60,33 +70,33 @@ public async Task ProcessAsync(Func<bool> condition, Socket socket)
6070

6171
if (stream.IsCompleted && TryRemoveStream(streamId, stream))
6272
{
63-
Protocol.WriteStreamClosed(framingBuffer, streamId);
73+
Protocol.WriteStreamClosed(FramingBuffer, streamId);
6474
}
6575
else
6676
{
67-
await stream.WriteFrameAsync(framingBuffer, streamId).ConfigureAwait(false);
77+
await stream.WriteFrameAsync(FramingBuffer, streamId).ConfigureAwait(false);
6878
}
6979

7080
// write the buffer on overflow
71-
if (framingBuffer.WrittenCount >= flushThreshold)
81+
if (FramingBuffer.WrittenCount >= FlushThreshold)
7282
{
73-
await SendAsync(framingBuffer.WrittenMemory, socket).ConfigureAwait(false);
74-
framingBuffer.Clear(reuseBuffer: true);
83+
await SendAsync(FramingBuffer.WrittenMemory, socket).ConfigureAwait(false);
84+
FramingBuffer.Clear(reuseBuffer: true);
7585
}
7686
}
7787

7888
// process protocol commands
79-
commands.Serialize(framingBuffer);
89+
Commands.Serialize(FramingBuffer);
8090

81-
switch (framingBuffer.WrittenCount)
91+
switch (FramingBuffer.WrittenCount)
8292
{
8393
case 0 when requiresHeartbeat:
84-
Protocol.WriteHeartbeat(framingBuffer);
94+
Protocol.WriteHeartbeat(FramingBuffer);
8595
goto default;
8696
case 0:
8797
break;
8898
default:
89-
await SendAsync(framingBuffer.WrittenMemory, socket).ConfigureAwait(false);
99+
await SendAsync(FramingBuffer.WrittenMemory, socket).ConfigureAwait(false);
90100
break;
91101
}
92102

@@ -99,7 +109,7 @@ private async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Socket socket)
99109
{
100110
for (int bytesWritten; !buffer.IsEmpty; buffer = buffer.Slice(bytesWritten))
101111
{
102-
StartOperation(timeout);
112+
StartOperation(Timeout);
103113
try
104114
{
105115
bytesWritten = await socket.SendAsync(buffer, TimeBoundedToken).ConfigureAwait(false);
@@ -121,9 +131,9 @@ private async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Socket socket)
121131

122132
public async ValueTask CompleteAllAsync(Exception e)
123133
{
124-
foreach (var id in streams.Keys)
134+
foreach (var id in Streams.Keys)
125135
{
126-
if (streams.TryRemove(id, out var stream))
136+
if (Streams.TryRemove(id, out var stream))
127137
{
128138
await stream.CompleteTransportOutputAsync(e).ConfigureAwait(false);
129139
await stream.CompleteTransportInputAsync(e).ConfigureAwait(false);

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,17 @@ protected MultiplexedClient(Options configuration)
3434
var flushThreshold = configuration.BufferCapacity;
3535
framingBuffer = new PoolingBufferWriter<byte>(configuration.ToAllocator()) { Capacity = flushThreshold };
3636

37-
input = new(new(),
38-
writeSignal,
39-
framingBuffer,
40-
flushThreshold,
41-
configuration.MeasurementTags,
42-
configuration.Timeout,
43-
configuration.HeartbeatTimeout,
44-
lifetimeToken);
37+
input = new()
38+
{
39+
TransportSignal = writeSignal,
40+
FramingBuffer = framingBuffer,
41+
FlushThreshold = flushThreshold,
42+
MeasurementTags = configuration.MeasurementTags,
43+
Timeout = configuration.Timeout,
44+
HeartbeatTimeout = configuration.HeartbeatTimeout,
45+
RootToken = lifetimeToken,
46+
};
47+
4548
output = input.CreateOutput(GC.AllocateArray<byte>(configuration.BufferCapacity, pinned: true), configuration.Timeout);
4649
}
4750

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedListener.Dispatcher.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,16 @@ private async Task DispatchAsync(Socket socket)
3030
var writeSignal = new AsyncAutoResetEvent(initialState: false);
3131
var receiveBuffer = allocator(flushThreshold);
3232
var framingBuffer = new PoolingBufferWriter<byte>(allocator) { Capacity = flushThreshold };
33-
var input = new InputMultiplexer<MultiplexedListener>(
34-
new(),
35-
writeSignal,
36-
framingBuffer,
37-
flushThreshold,
38-
CreateMeasurementTags(socket),
39-
timeout,
40-
heartbeatTimeout,
41-
lifetimeToken);
33+
var input = new InputMultiplexer<MultiplexedListener>
34+
{
35+
TransportSignal = writeSignal,
36+
FramingBuffer = framingBuffer,
37+
MeasurementTags = CreateMeasurementTags(socket),
38+
FlushThreshold = flushThreshold,
39+
Timeout = timeout,
40+
HeartbeatTimeout = heartbeatTimeout,
41+
RootToken = lifetimeToken,
42+
};
4243
var receiveTokenSource = CancellationTokenSource.CreateLinkedTokenSource(lifetimeToken);
4344
var output = input.CreateOutput(
4445
receiveBuffer.Memory,

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/Multiplexer.TimeoutControl.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
13
namespace DotNext.Net.Multiplexing;
24

35
using Threading;
46

5-
partial class Multiplexer<T>
7+
partial class Multiplexer
68
{
7-
private TimeoutSource source = new(TimeProvider.System, token);
9+
private TimeoutSource source;
810

9-
protected void StartOperation(TimeSpan timeout) => source.TryStart(timeout);
11+
public required CancellationToken RootToken
12+
{
13+
get => source.RootToken;
1014

11-
public CancellationToken RootToken => source.RootToken;
15+
[MemberNotNull(nameof(source))] init => source = new(TimeProvider.System, value);
16+
}
17+
18+
protected void StartOperation(TimeSpan timeout) => source.TryStart(timeout);
1219

1320
protected CancellationToken TimeBoundedToken => source.Token;
1421

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,36 @@
11
using System.Collections.Concurrent;
22
using System.Diagnostics;
3-
using System.Diagnostics.Metrics;
43

54
namespace DotNext.Net.Multiplexing;
65

7-
internal abstract partial class Multiplexer<T>(
6+
internal abstract partial class Multiplexer(
87
ConcurrentDictionary<uint, MultiplexedStream> streams,
9-
IProducerConsumerCollection<ProtocolCommand> commands,
10-
in TagList measurementTags,
11-
CancellationToken token) : Disposable
12-
where T : IStreamMetrics
8+
IProducerConsumerCollection<ProtocolCommand> commands) : Disposable
139
{
14-
protected readonly TagList measurementTags = measurementTags;
15-
protected readonly ConcurrentDictionary<uint, MultiplexedStream> streams = streams;
16-
protected readonly IProducerConsumerCollection<ProtocolCommand> commands = commands;
17-
18-
protected void ChangeStreamCount(long delta = 1) => T.ChangeStreamCount(delta, measurementTags);
10+
protected readonly IProducerConsumerCollection<ProtocolCommand> Commands = commands;
11+
protected readonly ConcurrentDictionary<uint, MultiplexedStream> Streams = streams;
1912

2013
protected override void Dispose(bool disposing)
2114
{
2215
if (disposing)
2316
{
2417
source.Dispose();
2518
}
26-
19+
2720
base.Dispose(disposing);
2821
}
2922

3023
protected override ValueTask DisposeAsyncCore() => source.DisposeAsync();
3124

3225
public new ValueTask DisposeAsync() => base.DisposeAsync();
26+
}
27+
28+
internal abstract class Multiplexer<T>(
29+
ConcurrentDictionary<uint, MultiplexedStream> streams,
30+
IProducerConsumerCollection<ProtocolCommand> commands) : Multiplexer(streams, commands)
31+
where T : IStreamMetrics
32+
{
33+
public required TagList MeasurementTags;
34+
35+
protected void ChangeStreamCount(long delta = 1) => T.ChangeStreamCount(delta, MeasurementTags);
3336
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/OutputMultiplexer.cs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Collections.Concurrent;
2-
using System.Diagnostics;
32
using System.Net.Sockets;
43

54
namespace DotNext.Net.Multiplexing;
@@ -8,14 +7,20 @@ namespace DotNext.Net.Multiplexing;
87

98
internal sealed class OutputMultiplexer<T>(
109
ConcurrentDictionary<uint, MultiplexedStream> streams,
11-
AsyncAutoResetEvent writeSignal,
12-
IProducerConsumerCollection<ProtocolCommand> commands,
13-
Memory<byte> framingBuffer,
14-
in TagList measurementTags,
15-
TimeSpan timeout,
16-
CancellationToken token) : Multiplexer<T>(streams, commands, measurementTags, token)
10+
IProducerConsumerCollection<ProtocolCommand> commands): Multiplexer<T>(streams, commands)
1711
where T : IStreamMetrics
1812
{
13+
private readonly Memory<byte> framingBuffer;
14+
15+
public required AsyncAutoResetEvent TransportSignal { private get; init; }
16+
17+
public required Memory<byte> FramingBuffer
18+
{
19+
init => framingBuffer = value;
20+
}
21+
22+
public required TimeSpan Timeout { private get; init; }
23+
1924
public MultiplexedStreamFactory? Factory { get; init; }
2025

2126
public Task ProcessAsync(Socket socket)
@@ -24,7 +29,7 @@ public Task ProcessAsync(Socket socket)
2429

2530
// if output multiplexer is completed due to exception, we need to trigger
2631
// the input multiplexer to handle the error
27-
task.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted(writeSignal.SetNoResult);
32+
task.ConfigureAwait(false).GetAwaiter().UnsafeOnCompleted(TransportSignal.SetNoResult);
2833
return task;
2934
}
3035

@@ -33,7 +38,7 @@ private async Task ProcessCoreAsync(Socket socket)
3338
FrameHeader header;
3439
for (var bufferedBytes = 0;; AdjustFramingBuffer(ref bufferedBytes, header, framingBuffer.Span))
3540
{
36-
StartOperation(timeout); // resumed by heartbeat
41+
StartOperation(Timeout); // resumed by heartbeat
3742
try
3843
{
3944
// read at least header
@@ -66,21 +71,21 @@ private async Task ProcessCoreAsync(Socket socket)
6671
if (header.Control is FrameControl.Heartbeat)
6772
continue;
6873

69-
if (!streams.TryGetValue(header.Id, out var stream))
74+
if (!Streams.TryGetValue(header.Id, out var stream))
7075
{
7176
if (Factory is null || header.CanBeIgnored)
7277
{
7378
continue;
7479
}
7580

76-
if ((stream = Factory(writeSignal, measurementTags)) is null)
81+
if ((stream = Factory(TransportSignal, MeasurementTags)) is null)
7782
{
78-
commands.TryAdd(new StreamRejectedCommand(header.Id));
79-
writeSignal.Set();
83+
Commands.TryAdd(new StreamRejectedCommand(header.Id));
84+
TransportSignal.Set();
8085
continue;
8186
}
8287

83-
streams[header.Id] = stream;
88+
Streams[header.Id] = stream;
8489
ChangeStreamCount();
8590
}
8691
else if (stream.IsTransportOutputCompleted)

0 commit comments

Comments
 (0)