Skip to content

Commit 1a08b13

Browse files
committed
Reduced frame header size
1 parent 862f395 commit 1a08b13

File tree

11 files changed

+28
-28
lines changed

11 files changed

+28
-28
lines changed

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/FrameControl.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ namespace DotNext.Net.Multiplexing;
33
/// <summary>
44
/// Controls the fragment behavior.
55
/// </summary>
6-
internal enum FrameControl : ushort
6+
internal enum FrameControl : byte
77
{
88
/// <summary>
99
/// The fragment contains a data chunk.
@@ -41,5 +41,5 @@ internal enum FrameControl : ushort
4141
/// <summary>
4242
/// Heartbeat system fragment.
4343
/// </summary>
44-
Heartbeat = 1024,
44+
Heartbeat = 128,
4545
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/FrameHeader.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,19 @@ namespace DotNext.Net.Multiplexing;
1212
/// <param name="control">The fragment behavior.</param>
1313
/// <param name="length">The length of the fragment data.</param>
1414
[StructLayout(LayoutKind.Auto)]
15-
internal readonly struct FrameHeader(ulong id, FrameControl control, ushort length) : IBinaryFormattable<FrameHeader>
15+
internal readonly struct FrameHeader(uint id, FrameControl control, ushort length) : IBinaryFormattable<FrameHeader>
1616
{
1717
private const byte CurrentVersion = 0;
1818

1919
/// <summary>
2020
/// All protocol-specific fragments have ID = 0.
2121
/// </summary>
22-
public const ulong SystemStreamId = 0U;
22+
public const uint SystemStreamId = 0U;
2323

24-
public const int Size = sizeof(byte) + sizeof(ulong) + sizeof(FrameControl) + sizeof(ushort);
24+
// Version(1 byte) StreamId(4 bytes) FrameControl(1 byte) PayloadLength(2 bytes)
25+
public const int Size = sizeof(byte) + sizeof(uint) + sizeof(FrameControl) + sizeof(ushort);
2526

26-
public ulong Id => id; // if 0, then the packet is protocol-specific
27+
public uint Id => id; // if 0, then the packet is protocol-specific
2728
public FrameControl Control => control;
2829
public ushort Length => length;
2930

@@ -36,7 +37,7 @@ public void Format(Span<byte> destination)
3637
var writer = new SpanWriter<byte>(destination);
3738
writer.Add(CurrentVersion);
3839
writer.WriteLittleEndian(id);
39-
writer.WriteLittleEndian((ushort)control);
40+
writer.Add((byte)control);
4041
writer.WriteLittleEndian(length);
4142
}
4243

@@ -47,10 +48,10 @@ public static FrameHeader Parse(ReadOnlySpan<byte> source)
4748

4849
if (version > CurrentVersion)
4950
throw new UnsupportedVersionException(version);
50-
51+
5152
return new(
52-
reader.ReadLittleEndian<ulong>(),
53-
(FrameControl)reader.ReadLittleEndian<ushort>(),
53+
reader.ReadLittleEndian<uint>(),
54+
(FrameControl)reader.Read(),
5455
reader.ReadLittleEndian<ushort>());
5556
}
5657
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/InputMultiplexer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace DotNext.Net.Multiplexing;
1010
using Threading;
1111

1212
internal sealed class InputMultiplexer(
13-
ConcurrentDictionary<ulong, MultiplexedStream> streams,
13+
ConcurrentDictionary<uint, MultiplexedStream> streams,
1414
AsyncAutoResetEvent writeSignal,
1515
BufferWriter<byte> framingBuffer,
1616
int flushThreshold,
@@ -23,7 +23,7 @@ internal sealed class InputMultiplexer(
2323

2424
public TimeSpan Timeout => timeout;
2525

26-
public bool TryAddStream(ulong streamId, MultiplexedStream stream)
26+
public bool TryAddStream(uint streamId, MultiplexedStream stream)
2727
{
2828
var result = streams.TryAdd(streamId, stream);
2929
ChangeStreamCount(Unsafe.BitCast<bool, byte>(result));

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.Dispatcher.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Diagnostics;
21
using System.Diagnostics.CodeAnalysis;
32
using System.IO.Pipelines;
43
using System.Net.Sockets;
@@ -23,7 +22,7 @@ partial class MultiplexedClient
2322

2423
[SuppressMessage("Usage", "CA2213", Justification = "False positive")]
2524
private readonly OutputMultiplexer output;
26-
private ulong streamId;
25+
private uint streamId;
2726

2827
private async Task DispatchAsync()
2928
{

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private async ValueTask<IDuplexPipe> OpenStreamCoreAsync(CancellationToken token
9595
private MultiplexedStream OpenStream()
9696
{
9797
var stream = new MultiplexedStream(options, writeSignal);
98-
ulong id;
98+
uint id;
9999
do
100100
{
101101
id = Interlocked.Increment(ref streamId);

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedStream.TransportSideReader.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ partial class MultiplexedStream
1111
// this is the maximum number of bytes that we can send without adjusting the window
1212
private volatile int inputWindow;
1313

14-
public ValueTask WriteFrameAsync(IBufferWriter<byte> writer, ulong streamId)
14+
public ValueTask WriteFrameAsync(IBufferWriter<byte> writer, uint streamId)
1515
{
1616
ValueTask task;
1717
if (!IsTransportInputCompleted(out var appSideCompleted))
@@ -64,15 +64,15 @@ private ValueTask CloseAndCancelOutputAsync()
6464
return task;
6565
}
6666

67-
private ValueTask WriteFrameAsync(IBufferWriter<byte> writer, ulong streamId, in ReadResult result)
67+
private ValueTask WriteFrameAsync(IBufferWriter<byte> writer, uint streamId, in ReadResult result)
6868
{
6969
AdjustWindowIfNeeded(writer, streamId);
7070
return WriteFrame(writer, streamId, result)
7171
? CompleteTransportInputAsync()
7272
: ValueTask.CompletedTask;
7373
}
7474

75-
private bool WriteFrame(Span<byte> frameBuffer, ulong streamId, in ReadResult readResult, out int bytesWritten, out SequencePosition consumed)
75+
private bool WriteFrame(Span<byte> frameBuffer, uint streamId, in ReadResult readResult, out int bytesWritten, out SequencePosition consumed)
7676
{
7777
Debug.Assert(frameBuffer.Length > FrameHeader.Size);
7878

@@ -117,7 +117,7 @@ private bool WriteFrame(Span<byte> frameBuffer, ulong streamId, in ReadResult re
117117
return completed;
118118
}
119119

120-
private bool WriteFrame(IBufferWriter<byte> writer, ulong streamId, in ReadResult result)
120+
private bool WriteFrame(IBufferWriter<byte> writer, uint streamId, in ReadResult result)
121121
{
122122
var buffer = writer.GetSpan(frameAndHeaderSize);
123123
var completed = WriteFrame(

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedStream.TransportSideWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void IApplicationSideStream.Consume(long count)
7777
transportSignal.Set();
7878
}
7979

80-
private void AdjustWindowIfNeeded(IBufferWriter<byte> writer, ulong streamId)
80+
private void AdjustWindowIfNeeded(IBufferWriter<byte> writer, uint streamId)
8181
{
8282
var totalReceived = Atomic.Read(in bytesReceived);
8383
if (totalReceived > resumeWatermark)

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/Multiplexer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
namespace DotNext.Net.Multiplexing;
66

77
internal abstract partial class Multiplexer(
8-
ConcurrentDictionary<ulong, MultiplexedStream> streams,
8+
ConcurrentDictionary<uint, MultiplexedStream> streams,
99
IProducerConsumerCollection<ProtocolCommand> commands,
1010
UpDownCounter<int> streamCounter,
1111
in TagList measurementTags,
1212
CancellationToken token) : Disposable
1313
{
1414
protected readonly UpDownCounter<int> streamCounter = streamCounter;
1515
protected readonly TagList measurementTags = measurementTags;
16-
protected readonly ConcurrentDictionary<ulong, MultiplexedStream> streams = streams;
16+
protected readonly ConcurrentDictionary<uint, MultiplexedStream> streams = streams;
1717
protected readonly IProducerConsumerCollection<ProtocolCommand> commands = commands;
1818

1919
protected void ChangeStreamCount(int delta = 1) => streamCounter.Add(delta, in measurementTags);

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/OutputMultiplexer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace DotNext.Net.Multiplexing;
88
using Threading;
99

1010
internal sealed class OutputMultiplexer(
11-
ConcurrentDictionary<ulong, MultiplexedStream> streams,
11+
ConcurrentDictionary<uint, MultiplexedStream> streams,
1212
AsyncAutoResetEvent writeSignal,
1313
IProducerConsumerCollection<ProtocolCommand> commands,
1414
Memory<byte> framingBuffer,

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/Protocol.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace DotNext.Net.Multiplexing;
77

88
internal static class Protocol
99
{
10-
private static void WriteEmptyFrame(IBufferWriter<byte> writer, [ConstantExpected] FrameControl control, ulong streamId)
10+
private static void WriteEmptyFrame(IBufferWriter<byte> writer, [ConstantExpected] FrameControl control, uint streamId)
1111
{
1212
var header = new FrameHeader(streamId, control, length: 0);
1313
header.Format(writer.GetSpan(FrameHeader.Size));
@@ -17,13 +17,13 @@ private static void WriteEmptyFrame(IBufferWriter<byte> writer, [ConstantExpecte
1717
public static void WriteHeartbeat(IBufferWriter<byte> writer)
1818
=> WriteEmptyFrame(writer, FrameControl.Heartbeat, FrameHeader.SystemStreamId);
1919

20-
public static void WriteStreamRejected(IBufferWriter<byte> writer, ulong streamId)
20+
public static void WriteStreamRejected(IBufferWriter<byte> writer, uint streamId)
2121
=> WriteEmptyFrame(writer, FrameControl.StreamRejected, streamId);
2222

23-
public static void WriteStreamClosed(IBufferWriter<byte> writer, ulong streamId)
23+
public static void WriteStreamClosed(IBufferWriter<byte> writer, uint streamId)
2424
=> WriteEmptyFrame(writer, FrameControl.StreamClosed, streamId);
2525

26-
public static void WriteAdjustWindow(IBufferWriter<byte> writer, ulong streamId, int windowSize)
26+
public static void WriteAdjustWindow(IBufferWriter<byte> writer, uint streamId, int windowSize)
2727
{
2828
const int messageSize = FrameHeader.Size + sizeof(int);
2929

0 commit comments

Comments
 (0)