Skip to content

Commit c51afc4

Browse files
committed
Added a way to avoid stream ID inflation on the client side in the case of unstable TCP connection
1 parent 4732b85 commit c51afc4

File tree

3 files changed

+84
-7
lines changed

3 files changed

+84
-7
lines changed

src/DotNext.Tests/Net/Multiplexing/TcpMultiplexerTests.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,32 @@ protected override void Record(int value)
239239

240240
public Task WaitForZero(TimeSpan timeout) => zeroReached.Task.WaitAsync(timeout);
241241
}
242+
243+
[Fact]
244+
public static async Task WaitForConnectionAsync()
245+
{
246+
await using var client = new TcpMultiplexedClient(LocalEndPoint, new() { Timeout = DefaultTimeout });
247+
248+
await using var server = new TcpMultiplexedListener(LocalEndPoint, new() { Timeout = DefaultTimeout });
249+
await server.StartAsync();
250+
251+
var task = client.WaitForConnectionAsync().AsTask();
252+
False(task.IsCompleted);
253+
254+
await client.StartAsync();
255+
await task;
256+
True(task.IsCompletedSuccessfully);
257+
}
258+
259+
[Fact]
260+
public static async Task WaitForDisposedConnectionAsync()
261+
{
262+
Task task;
263+
await using (var client = new TcpMultiplexedClient(LocalEndPoint, new() { Timeout = DefaultTimeout }))
264+
{
265+
task = client.WaitForConnectionAsync().AsTask();
266+
}
267+
268+
await ThrowsAsync<ObjectDisposedException>(Func.Constant(task));
269+
}
242270
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.Dispatcher.cs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Diagnostics.CodeAnalysis;
22
using System.IO.Pipelines;
33
using System.Net.Sockets;
4+
using System.Runtime.ExceptionServices;
45
using Microsoft.AspNetCore.Connections;
56

67
namespace DotNext.Net.Multiplexing;
@@ -24,6 +25,30 @@ partial class MultiplexedClient
2425
private readonly OutputMultiplexer output;
2526
private uint streamId;
2627

28+
private void ReportConnected()
29+
=> Interlocked.Exchange(ref readiness, null)?.TrySetResult();
30+
31+
private void ReportDisconnected()
32+
{
33+
if (readiness is null)
34+
{
35+
var source = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
36+
Interlocked.CompareExchange(ref readiness, source, null);
37+
}
38+
}
39+
40+
private void ReportDisposed()
41+
{
42+
var e = new ObjectDisposedException(GetType().Name);
43+
ExceptionDispatchInfo.SetCurrentStackTrace(e);
44+
if (readiness?.TrySetException(e) is null)
45+
{
46+
var source = new TaskCompletionSource();
47+
source.SetException(e);
48+
Interlocked.CompareExchange(ref readiness, source, null);
49+
}
50+
}
51+
2752
private async Task DispatchAsync()
2853
{
2954
var socket = default(Socket);
@@ -43,7 +68,7 @@ private async Task DispatchAsync()
4368
{
4469
socket = await ConnectAsync(input.RootToken).ConfigureAwait(false);
4570
receiveLoop = output.ProcessAsync(socket);
46-
readiness.TrySetResult();
71+
ReportConnected();
4772
}
4873

4974
// send data
@@ -61,6 +86,8 @@ await input.CompleteAllAsync(new ConnectionResetException(ExceptionMessages.Conn
6186
{
6287
socket?.Dispose();
6388
await receiveLoop.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
89+
90+
ReportDisconnected();
6491
await input.CompleteAllAsync(e).ConfigureAwait(false);
6592
}
6693
}

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.cs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace DotNext.Net.Multiplexing;
1313
[Experimental("DOTNEXT001")]
1414
public abstract partial class MultiplexedClient : Disposable, IAsyncDisposable
1515
{
16-
private readonly TaskCompletionSource readiness;
16+
private volatile TaskCompletionSource? readiness;
1717
private Task dispatcher;
1818

1919
[SuppressMessage("Usage", "CA2213", Justification = "False positive")]
@@ -46,6 +46,9 @@ protected MultiplexedClient(Options configuration)
4646
output = input.CreateOutput(GC.AllocateArray<byte>(configuration.BufferCapacity, pinned: true), configuration.Timeout);
4747
}
4848

49+
private Task WaitForConnectionCoreAsync(CancellationToken token)
50+
=> readiness?.Task.WaitAsync(token) ?? Task.CompletedTask;
51+
4952
/// <summary>
5053
/// Connects to the server and starts the dispatching loop.
5154
/// </summary>
@@ -54,14 +57,27 @@ protected MultiplexedClient(Options configuration)
5457
/// <exception cref="ObjectDisposedException">The client is disposed.</exception>
5558
public ValueTask StartAsync(CancellationToken token = default)
5659
{
60+
var task = WaitForConnectionCoreAsync(token);
5761
if (ReferenceEquals(dispatcher, Task.CompletedTask))
5862
{
5963
dispatcher = DispatchAsync();
6064
}
6165

62-
return new(readiness.Task.WaitAsync(token));
66+
return new(task);
6367
}
6468

69+
/// <summary>
70+
/// Waits for the connection to be established.
71+
/// </summary>
72+
/// <remarks>
73+
/// The method can be called to ensure that the connection to the server is established successfully.
74+
/// This is useful when the underlying connection is lost to prevent inflation of the stream IDs.
75+
/// </remarks>
76+
/// <param name="token">The token that can be used to cancel the operation.</param>
77+
/// <returns>The task representing connection state.</returns>
78+
public ValueTask WaitForConnectionAsync(CancellationToken token = default)
79+
=> new(WaitForConnectionCoreAsync(token));
80+
6581
/// <summary>
6682
/// Creates a new multiplexed client stream.
6783
/// </summary>
@@ -84,11 +100,16 @@ public ValueTask StartAsync(CancellationToken token = default)
84100
/// <returns>A duplex pipe for data input/output.</returns>
85101
/// <seealso cref="DotNext.IO.Pipelines.DuplexStream"/>
86102
public ValueTask<IDuplexPipe> OpenStreamAsync(CancellationToken token = default)
87-
=> readiness.Task.IsCompletedSuccessfully ? new(OpenStream()) : OpenStreamCoreAsync(token);
103+
{
104+
var readinessCopy = readiness;
105+
return readinessCopy is null or { Task.IsCompletedSuccessfully: true }
106+
? new(OpenStream())
107+
: OpenStreamCoreAsync(readinessCopy.Task, token);
108+
}
88109

89-
private async ValueTask<IDuplexPipe> OpenStreamCoreAsync(CancellationToken token)
110+
private async ValueTask<IDuplexPipe> OpenStreamCoreAsync(Task readinessTask, CancellationToken token)
90111
{
91-
await readiness.Task.WaitAsync(token).ConfigureAwait(false);
112+
await readinessTask.WaitAsync(token).ConfigureAwait(false);
92113
return OpenStream();
93114
}
94115

@@ -130,7 +151,6 @@ private void Cancel()
130151
{
131152
if (Interlocked.Exchange(ref lifetimeTokenSource, null) is { } cts)
132153
{
133-
readiness.TrySetException(new ObjectDisposedException(GetType().Name));
134154
using (cts)
135155
{
136156
cts.Cancel();
@@ -143,6 +163,8 @@ protected override async ValueTask DisposeAsyncCore()
143163
{
144164
Cancel();
145165
await dispatcher.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
166+
ReportDisposed();
167+
146168
await input.DisposeAsync().ConfigureAwait(false);
147169
await output.DisposeAsync().ConfigureAwait(false);
148170
writeSignal.Dispose();

0 commit comments

Comments
 (0)