Skip to content

Commit 9d7e73b

Browse files
committed
Open stream only when the client is connected
1 parent 3c952f3 commit 9d7e73b

File tree

4 files changed

+34
-36
lines changed

4 files changed

+34
-36
lines changed

src/DotNext.Tests/Net/Multiplexing/TcpMultiplexerTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public static async Task WaitForConnectionAsync()
248248
await using var server = new TcpMultiplexedListener(LocalEndPoint, new() { Timeout = DefaultTimeout });
249249
await server.StartAsync();
250250

251-
var task = client.EnsureConnectedAsync().AsTask();
251+
var task = client.OpenStreamAsync().AsTask();
252252
False(task.IsCompleted);
253253

254254
await client.StartAsync();
@@ -262,7 +262,7 @@ public static async Task WaitForDisposedConnectionAsync()
262262
Task task;
263263
await using (var client = new TcpMultiplexedClient(LocalEndPoint, new() { Timeout = DefaultTimeout }))
264264
{
265-
task = client.EnsureConnectedAsync().AsTask();
265+
task = client.OpenStreamAsync().AsTask();
266266
}
267267

268268
await ThrowsAsync<ObjectDisposedException>(Func.Constant(task));

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/InputMultiplexer.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,14 @@ public bool TryAddStream(uint streamId, MultiplexedStream stream)
2929
ChangeStreamCount(Unsafe.BitCast<bool, byte>(result));
3030
return result;
3131
}
32-
32+
33+
public bool TryRemoveStream(uint streamId, MultiplexedStream stream)
34+
{
35+
var removed = streams.TryRemove(new(streamId, stream));
36+
ChangeStreamCount(-Unsafe.BitCast<bool, byte>(removed));
37+
return removed;
38+
}
39+
3340
public OutputMultiplexer CreateOutput(Memory<byte> framingBuffer, TimeSpan receiveTimeout)
3441
=> new(streams, writeSignal, commands, framingBuffer, streamCounter, measurementTags, receiveTimeout, RootToken);
3542

@@ -52,10 +59,9 @@ public async Task ProcessAsync(Func<bool> condition, Socket socket)
5259
{
5360
var (streamId, stream) = enumerator.Current;
5461

55-
if (stream.IsCompleted && streams.TryRemove(streamId, out _))
62+
if (stream.IsCompleted && TryRemoveStream(streamId, stream))
5663
{
5764
Protocol.WriteStreamClosed(framingBuffer, streamId);
58-
ChangeStreamCount(-1);
5965
}
6066
else
6167
{

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedClient.cs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ protected MultiplexedClient(Options configuration)
4646
output = input.CreateOutput(GC.AllocateArray<byte>(configuration.BufferCapacity, pinned: true), configuration.Timeout);
4747
}
4848

49-
private Task EnsureConnectedCoreAsync(CancellationToken token)
49+
private Task EnsureConnectedAsync(CancellationToken token)
5050
=> readiness?.Task.WaitAsync(token) ?? Task.CompletedTask;
5151

5252
/// <summary>
@@ -57,7 +57,7 @@ private Task EnsureConnectedCoreAsync(CancellationToken token)
5757
/// <exception cref="ObjectDisposedException">The client is disposed.</exception>
5858
public ValueTask StartAsync(CancellationToken token = default)
5959
{
60-
var task = EnsureConnectedCoreAsync(token);
60+
var task = EnsureConnectedAsync(token);
6161
if (ReferenceEquals(dispatcher, Task.CompletedTask))
6262
{
6363
dispatcher = DispatchAsync();
@@ -66,18 +66,6 @@ public ValueTask StartAsync(CancellationToken token = default)
6666
return new(task);
6767
}
6868

69-
/// <summary>
70-
/// Waits for the connection to be established.
71-
/// </summary>
72-
/// <remarks>
73-
/// The method can be called to ensure that the connection to the server is established successfully.
74-
/// This is useful when the underlying connection is lost to prevent inflation of the stream IDs.
75-
/// </remarks>
76-
/// <param name="token">The token that can be used to cancel the operation.</param>
77-
/// <returns>The task representing connection state.</returns>
78-
public ValueTask EnsureConnectedAsync(CancellationToken token = default)
79-
=> new(EnsureConnectedCoreAsync(token));
80-
8169
/// <summary>
8270
/// Creates a new multiplexed client stream.
8371
/// </summary>
@@ -99,29 +87,31 @@ public ValueTask EnsureConnectedAsync(CancellationToken token = default)
9987
/// </remarks>
10088
/// <returns>A duplex pipe for data input/output.</returns>
10189
/// <seealso cref="DotNext.IO.Pipelines.DuplexStream"/>
102-
public ValueTask<IDuplexPipe> OpenStreamAsync(CancellationToken token = default)
103-
{
104-
var readinessCopy = readiness;
105-
return readinessCopy is null or { Task.IsCompletedSuccessfully: true }
106-
? new(OpenStream())
107-
: OpenStreamCoreAsync(readinessCopy.Task, token);
108-
}
109-
110-
private async ValueTask<IDuplexPipe> OpenStreamCoreAsync(Task readinessTask, CancellationToken token)
90+
public async ValueTask<IDuplexPipe> OpenStreamAsync(CancellationToken token = default)
11191
{
112-
await readinessTask.WaitAsync(token).ConfigureAwait(false);
113-
return OpenStream();
92+
for (var stream = OpenStream(out var addedStreamId);; token.ThrowIfCancellationRequested())
93+
{
94+
try
95+
{
96+
await EnsureConnectedAsync(token).ConfigureAwait(false);
97+
return stream;
98+
}
99+
catch (Exception e)
100+
{
101+
input.TryRemoveStream(addedStreamId, stream);
102+
await stream.AbortAppSideAsync(e).ConfigureAwait(false);
103+
}
104+
}
114105
}
115106

116-
private MultiplexedStream OpenStream()
107+
private MultiplexedStream OpenStream(out uint id)
117108
{
118109
var stream = new MultiplexedStream(options, writeSignal);
119-
uint id;
120110
do
121111
{
122112
id = Interlocked.Increment(ref streamId);
123113
} while (!input.TryAddStream(id, stream));
124-
114+
125115
return stream;
126116
}
127117

src/cluster/DotNext.Net.Cluster/Net/Multiplexing/MultiplexedStream.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,12 @@ private bool IsTransportInputCompleted(out bool appSideCompleted)
8585
return (stateCopy & TransportInputCompletedState) is not 0U;
8686
}
8787

88-
public async ValueTask AbortAppSideAsync()
88+
public async ValueTask AbortAppSideAsync(Exception? e = null)
8989
{
90-
var e = new ConnectionAbortedException();
91-
ExceptionDispatchInfo.SetCurrentStackTrace(e);
90+
if (e is null)
91+
{
92+
ExceptionDispatchInfo.SetCurrentStackTrace(e = new ConnectionAbortedException());
93+
}
9294

9395
await appWriter.CompleteAsync(e).ConfigureAwait(false);
9496
await appReader.CompleteAsync(e).ConfigureAwait(false);

0 commit comments

Comments
 (0)