Skip to content

Commit 609bb4f

Browse files
committed
Added metrics for pending streams count
1 parent ddb7c39 commit 609bb4f

13 files changed

+75
-47
lines changed

src/DotNext.Tests/Net/Multiplexing/TcpMultiplexerTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,15 @@ public static async Task TerminateStream()
223223
await streamCount.WaitForZero(DefaultTimeout);
224224
}
225225

226-
private sealed class StreamCountObserver() : InstrumentObserver<int, UpDownCounter<int>>(static (instr, tags) => IsStreamCount(instr))
226+
private sealed class StreamCountObserver() : InstrumentObserver<long, UpDownCounter<long>>(static (instr, tags) => IsStreamCount(instr))
227227
{
228228
private readonly TaskCompletionSource zeroReached = new();
229-
private int streamCount;
229+
private long streamCount;
230230

231231
internal static bool IsStreamCount(Instrument instrument)
232232
=> instrument is { Meter.Name: "DotNext.Net.Multiplexing.Server", Name: "streams-count" };
233233

234-
protected override void Record(int value)
234+
protected override void Record(long value)
235235
{
236236
if (Interlocked.Add(ref streamCount, value) is 0)
237237
zeroReached.TrySetResult();
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using System.Diagnostics;
2+
3+
namespace DotNext.Net.Multiplexing;
4+
5+
internal interface IStreamMetrics
6+
{
7+
static abstract void ChangeStreamCount(long delta, in TagList measurementTags);
8+
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/InputMultiplexer.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Collections.Concurrent;
22
using System.Diagnostics;
3-
using System.Diagnostics.Metrics;
43
using System.Net.Sockets;
54
using System.Runtime.CompilerServices;
65

@@ -9,16 +8,16 @@ namespace DotNext.Net.Multiplexing;
98
using Buffers;
109
using Threading;
1110

12-
internal sealed class InputMultiplexer(
11+
internal sealed class InputMultiplexer<T>(
1312
ConcurrentDictionary<uint, MultiplexedStream> streams,
1413
AsyncAutoResetEvent writeSignal,
1514
BufferWriter<byte> framingBuffer,
1615
int flushThreshold,
17-
UpDownCounter<int> streamCounter,
1816
in TagList measurementTags,
1917
TimeSpan timeout,
2018
TimeSpan heartbeatTimeout,
21-
CancellationToken token) : Multiplexer(streams, new ConcurrentQueue<ProtocolCommand>(), streamCounter, measurementTags, token)
19+
CancellationToken token) : Multiplexer<T>(streams, new ConcurrentQueue<ProtocolCommand>(), measurementTags, token)
20+
where T : IStreamMetrics
2221
{
2322

2423
public TimeSpan Timeout => timeout;
@@ -37,13 +36,13 @@ public bool TryRemoveStream(uint streamId, MultiplexedStream stream)
3736
return removed;
3837
}
3938

40-
public OutputMultiplexer CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout)
41-
=> new(streams, writeSignal, commands, framingBuffer, streamCounter, measurementTags, receiveTimeout, RootToken);
39+
public OutputMultiplexer<T> CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout)
40+
=> new(streams, writeSignal, commands, framingBuffer, measurementTags, receiveTimeout, RootToken);
4241

43-
public OutputMultiplexer CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout, Func<AsyncAutoResetEvent, MultiplexedStream?> handlerFactory,
42+
public OutputMultiplexer<T> CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout, MultiplexedStreamFactory handlerFactory,
4443
CancellationToken token)
45-
=> new(streams, writeSignal, commands, framingBuffer, streamCounter, measurementTags, receiveTimeout, token)
46-
{ HandlerFactory = handlerFactory };
44+
=> new(streams, writeSignal, commands, framingBuffer, measurementTags, receiveTimeout, token)
45+
{ Factory = handlerFactory };
4746

4847
public async Task ProcessAsync(Func<bool> condition, Socket socket)
4948
{

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.Dispatcher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ partial class MultiplexedClient
1919
private readonly PipeOptions options;
2020

2121
[SuppressMessage("Usage", "CA2213", Justification = "False positive")]
22-
private readonly InputMultiplexer input;
22+
private readonly InputMultiplexer<MultiplexedClient> input;
2323

2424
[SuppressMessage("Usage", "CA2213", Justification = "False positive")]
25-
private readonly OutputMultiplexer output;
25+
private readonly OutputMultiplexer<MultiplexedClient> output;
2626
private uint streamId;
2727

2828
private void ReportConnected()
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
using System.Diagnostics;
12
using System.Diagnostics.Metrics;
23

34
namespace DotNext.Net.Multiplexing;
45

5-
partial class MultiplexedClient
6+
partial class MultiplexedClient : IStreamMetrics
67
{
7-
private static readonly UpDownCounter<int> streamCount;
8+
private static readonly UpDownCounter<long> StreamCount;
89

910
static MultiplexedClient()
1011
{
1112
var meter = new Meter("DotNext.Net.Multiplexing.Client");
12-
streamCount = meter.CreateUpDownCounter<int>("streams-count", description: "Number of Streams");
13+
StreamCount = meter.CreateUpDownCounter<long>("streams-count", description: "Number of Streams");
1314
}
15+
16+
static void IStreamMetrics.ChangeStreamCount(long delta, in TagList measurementTags)
17+
=> StreamCount.Add(delta, measurementTags);
1418
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ protected MultiplexedClient(Options configuration)
3838
writeSignal,
3939
framingBuffer,
4040
flushThreshold,
41-
streamCount,
4241
configuration.MeasurementTags,
4342
configuration.Timeout,
4443
configuration.HeartbeatTimeout,

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedListener.Dispatcher.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,11 @@ private async Task DispatchAsync(Socket socket)
3030
var writeSignal = new AsyncAutoResetEvent(initialState: false);
3131
var receiveBuffer = allocator(flushThreshold);
3232
var framingBuffer = new PoolingBufferWriter<byte>(allocator) { Capacity = flushThreshold };
33-
var input = new InputMultiplexer(
33+
var input = new InputMultiplexer<MultiplexedListener>(
3434
new(),
3535
writeSignal,
3636
framingBuffer,
3737
flushThreshold,
38-
streamCount,
3938
CreateMeasurementTags(socket),
4039
timeout,
4140
heartbeatTimeout,

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedListener.Metrics.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,21 @@
33

44
namespace DotNext.Net.Multiplexing;
55

6-
partial class MultiplexedListener
6+
partial class MultiplexedListener : IStreamMetrics
77
{
88
private const string ClientAddressMeterAttribute = "dotnext.net.multiplexing.client.address";
9-
10-
private static readonly UpDownCounter<int> streamCount;
9+
10+
private static readonly UpDownCounter<long> StreamCount, PendingStreamCount;
1111

1212
static MultiplexedListener()
1313
{
1414
var meter = new Meter("DotNext.Net.Multiplexing.Server");
15-
streamCount = meter.CreateUpDownCounter<int>("streams-count", description: "Number of Streams");
15+
StreamCount = meter.CreateUpDownCounter<long>("streams-count", description: "Number of Streams");
16+
PendingStreamCount = meter.CreateUpDownCounter<long>("pending-streams-count", description: "Number of Pending Streams");
1617
}
1718

1819
private readonly TagList measurementTags;
20+
21+
static void IStreamMetrics.ChangeStreamCount(long delta, in TagList measurementTags)
22+
=> StreamCount.Add(delta, measurementTags);
1923
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedListener.cs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Diagnostics;
12
using System.Diagnostics.CodeAnalysis;
23
using System.IO.Pipelines;
34
using System.Net.Sockets;
@@ -20,7 +21,7 @@ public abstract partial class MultiplexedListener : Disposable, IAsyncDisposable
2021
private readonly MemoryAllocator<byte> allocator;
2122
private readonly int flushThreshold;
2223
private readonly TaskCompletionSource readiness;
23-
private readonly Func<AsyncAutoResetEvent, MultiplexedStream?> streamFactory;
24+
private readonly MultiplexedStreamFactory streamFactory;
2425
private Task listener;
2526

2627
[SuppressMessage("Usage", "CA2213", Justification = "False positive")]
@@ -41,7 +42,7 @@ protected MultiplexedListener(Options configuration)
4142
});
4243

4344
allocator = configuration.ToAllocator();
44-
streamFactory = new MultiplexedStreamFactory(configuration.BufferOptions, backlog.Writer).CreateStream;
45+
streamFactory = new MultiplexedStreamFactoryImpl(configuration.BufferOptions, backlog.Writer);
4546
measurementTags = configuration.MeasurementTags;
4647
flushThreshold = configuration.BufferCapacity;
4748
readiness = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -80,6 +81,7 @@ public async ValueTask<IDuplexPipe> AcceptAsync(CancellationToken token = defaul
8081
throw new ObjectDisposedException(GetType().Name);
8182
}
8283

84+
PendingStreamCount.Add(-1L);
8385
return result;
8486
}
8587

@@ -188,15 +190,24 @@ protected override async ValueTask DisposeAsyncCore()
188190

189191
/// <inheritdoc/>
190192
public new ValueTask DisposeAsync() => base.DisposeAsync();
191-
}
192193

193-
file sealed class MultiplexedStreamFactory(PipeOptions options, ChannelWriter<MultiplexedStream> backlog)
194-
{
195-
public MultiplexedStream? CreateStream(AsyncAutoResetEvent writeSignal)
194+
private sealed class MultiplexedStreamFactoryImpl(PipeOptions options, ChannelWriter<MultiplexedStream> backlog)
196195
{
197-
var stream = new MultiplexedStream(options, writeSignal);
198-
return backlog.TryWrite(stream)
199-
? stream
200-
: null;
196+
private MultiplexedStream? CreateStream(AsyncAutoResetEvent writeSignal, in TagList measurementTags)
197+
{
198+
var stream = new MultiplexedStream(options, writeSignal);
199+
if (backlog.TryWrite(stream))
200+
{
201+
PendingStreamCount.Add(1L, measurementTags);
202+
}
203+
else
204+
{
205+
stream = null;
206+
}
207+
208+
return stream;
209+
}
210+
211+
public static implicit operator MultiplexedStreamFactory(MultiplexedStreamFactoryImpl impl) => impl.CreateStream;
201212
}
202213
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
using System.Diagnostics;
2+
3+
namespace DotNext.Net.Multiplexing;
4+
5+
using Threading;
6+
7+
internal delegate MultiplexedStream? MultiplexedStreamFactory(AsyncAutoResetEvent transportSignal, in TagList measurementTags);

0 commit comments

Comments
 (0)