Skip to content

Commit b035659

Browse files
committed
MqttChannel: Exception handling remains consistent with MqttChannelAdapter.
1 parent efcd6ab commit b035659

File tree

8 files changed

+137
-74
lines changed

8 files changed

+137
-74
lines changed

Source/MQTTnet.AspnetCore/Internal/MqttChannel.cs

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
using MQTTnet.Packets;
1313
using System;
1414
using System.Buffers;
15+
using System.IO;
1516
using System.IO.Pipelines;
1617
using System.Net;
18+
using System.Net.Sockets;
19+
using System.Runtime.InteropServices;
1720
using System.Security.Cryptography.X509Certificates;
1821
using System.Threading;
1922
using System.Threading.Tasks;
@@ -106,8 +109,18 @@ private static bool IsTlsConnection(IHttpContextFeature? _httpContextFeature, IT
106109

107110
public async Task DisconnectAsync()
108111
{
109-
await _input.CompleteAsync().ConfigureAwait(false);
110-
await _output.CompleteAsync().ConfigureAwait(false);
112+
try
113+
{
114+
await _input.CompleteAsync().ConfigureAwait(false);
115+
await _output.CompleteAsync().ConfigureAwait(false);
116+
}
117+
catch (Exception exception)
118+
{
119+
if (!WrapAndThrowException(exception))
120+
{
121+
throw;
122+
}
123+
}
111124
}
112125

113126
public virtual void Dispose()
@@ -116,6 +129,29 @@ public virtual void Dispose()
116129
}
117130

118131
public async Task<MqttPacket?> ReceivePacketAsync(CancellationToken cancellationToken)
132+
{
133+
try
134+
{
135+
return await ReceivePacketCoreAsync(cancellationToken).ConfigureAwait(false);
136+
}
137+
catch (OperationCanceledException)
138+
{
139+
}
140+
catch (ObjectDisposedException)
141+
{
142+
}
143+
catch (Exception exception)
144+
{
145+
if (!WrapAndThrowException(exception))
146+
{
147+
throw;
148+
}
149+
}
150+
151+
return null;
152+
}
153+
154+
private async Task<MqttPacket?> ReceivePacketCoreAsync(CancellationToken cancellationToken)
119155
{
120156
try
121157
{
@@ -143,7 +179,7 @@ public virtual void Dispose()
143179
{
144180
if (!buffer.IsEmpty)
145181
{
146-
if (PacketFormatterAdapter.TryDecode(buffer,_packetInspector, out var packet, out consumed, out observed, out var received))
182+
if (PacketFormatterAdapter.TryDecode(buffer, _packetInspector, out var packet, out consumed, out observed, out var received))
147183
{
148184
BytesReceived += received;
149185

@@ -168,11 +204,11 @@ public virtual void Dispose()
168204
}
169205
}
170206
}
171-
catch (Exception exception)
207+
catch (Exception)
172208
{
173209
// completing the channel makes sure that there is no more data read after a protocol error
174-
_input.Complete(exception);
175-
_output.Complete(exception);
210+
await _input.CompleteAsync().ConfigureAwait(false);
211+
await _output.CompleteAsync().ConfigureAwait(false);
176212

177213
throw;
178214
}
@@ -188,6 +224,21 @@ public void ResetStatistics()
188224
}
189225

190226
public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellationToken)
227+
{
228+
try
229+
{
230+
await SendPacketCoreAsync(packet, cancellationToken).ConfigureAwait(false);
231+
}
232+
catch (Exception exception)
233+
{
234+
if (!WrapAndThrowException(exception))
235+
{
236+
throw;
237+
}
238+
}
239+
}
240+
241+
private async Task SendPacketCoreAsync(MqttPacket packet, CancellationToken cancellationToken)
191242
{
192243
using (await _writerLock.EnterAsync(cancellationToken).ConfigureAwait(false))
193244
{
@@ -241,4 +292,44 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer)
241292
buffer.Payload.CopyTo(destination: span.Slice(offset));
242293
output.Advance(buffer.Length);
243294
}
295+
296+
public static bool WrapAndThrowException(Exception exception)
297+
{
298+
if (exception is OperationCanceledException ||
299+
exception is MqttCommunicationTimedOutException ||
300+
exception is MqttCommunicationException ||
301+
exception is MqttProtocolViolationException)
302+
{
303+
return false;
304+
}
305+
306+
if (exception is IOException && exception.InnerException is SocketException innerException)
307+
{
308+
exception = innerException;
309+
}
310+
311+
if (exception is SocketException socketException)
312+
{
313+
if (socketException.SocketErrorCode == SocketError.OperationAborted)
314+
{
315+
throw new OperationCanceledException();
316+
}
317+
318+
if (socketException.SocketErrorCode == SocketError.ConnectionAborted)
319+
{
320+
throw new MqttCommunicationException(socketException);
321+
}
322+
}
323+
324+
if (exception is COMException comException)
325+
{
326+
const uint ErrorOperationAborted = 0x800703E3;
327+
if ((uint)comException.HResult == ErrorOperationAborted)
328+
{
329+
throw new OperationCanceledException();
330+
}
331+
}
332+
333+
throw new MqttCommunicationException(exception);
334+
}
244335
}

Source/MQTTnet.AspnetCore/Internal/MqttClientChannelAdapter.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,23 @@ public MqttClientChannelAdapter(
5252

5353
public async Task ConnectAsync(CancellationToken cancellationToken)
5454
{
55-
_connection = _channelOptions switch
55+
try
5656
{
57-
MqttClientTcpOptions tcpOptions => await ClientConnectionContext.CreateAsync(tcpOptions, cancellationToken).ConfigureAwait(false),
58-
MqttClientWebSocketOptions webSocketOptions => await ClientConnectionContext.CreateAsync(webSocketOptions, cancellationToken).ConfigureAwait(false),
59-
_ => throw new NotSupportedException(),
60-
};
61-
_channel = new MqttChannel(_packetFormatterAdapter, _connection, _packetInspector, _allowPacketFragmentation);
57+
_connection = _channelOptions switch
58+
{
59+
MqttClientTcpOptions tcpOptions => await ClientConnectionContext.CreateAsync(tcpOptions, cancellationToken).ConfigureAwait(false),
60+
MqttClientWebSocketOptions webSocketOptions => await ClientConnectionContext.CreateAsync(webSocketOptions, cancellationToken).ConfigureAwait(false),
61+
_ => throw new NotSupportedException(),
62+
};
63+
_channel = new MqttChannel(_packetFormatterAdapter, _connection, _packetInspector, _allowPacketFragmentation);
64+
}
65+
catch (Exception ex)
66+
{
67+
if (!MqttChannel.WrapAndThrowException(ex))
68+
{
69+
throw;
70+
}
71+
}
6272
}
6373

6474
public Task DisconnectAsync(CancellationToken cancellationToken)

Source/MQTTnet.Tests/ASP/MqttConnectionContextTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public async Task TestCorruptedConnectPacket()
3737
await Assert.ThrowsExceptionAsync<MqttProtocolViolationException>(() => ctx.ReceivePacketAsync(CancellationToken.None));
3838

3939
// the first exception should complete the pipes so if someone tries to use the connection after that it should throw immidiatly
40-
await Assert.ThrowsExceptionAsync<InvalidOperationException>(() => ctx.ReceivePacketAsync(CancellationToken.None));
40+
await Assert.ThrowsExceptionAsync<MqttCommunicationException>(() => ctx.ReceivePacketAsync(CancellationToken.None));
4141
}
4242

4343
// TODO: Fix test

Source/MQTTnet.Tests/Clients/LowLevelMqttClient/LowLevelMqttClient_Tests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public async Task Connect_To_Wrong_Host()
8080
[TestMethod]
8181
public async Task Loose_Connection()
8282
{
83-
using var testEnvironments = CreateMQTTnetTestEnvironment();
83+
using var testEnvironments = CreateMixedTestEnvironment();
8484
foreach (var testEnvironment in testEnvironments)
8585
{
8686
testEnvironment.IgnoreServerLogErrors = true;

Source/MQTTnet.Tests/MQTTv5/Server_Tests.cs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -182,21 +182,9 @@ public async Task Reconnect_with_different_SessionExpiryInterval()
182182
}
183183

184184
[TestMethod]
185-
public async Task Disconnect_with_Reason_MQTTnet()
186-
{
187-
using var testEnvironments = CreateMQTTnetTestEnvironment();
188-
await Disconnect_with_Reason(testEnvironments);
189-
}
190-
191-
[TestMethod]
192-
public async Task Disconnect_with_Reason_AspNetCore()
193-
{
194-
using var testEnvironments = CreateAspNetCoreTestEnvironment();
195-
await Disconnect_with_Reason(testEnvironments);
196-
}
197-
198-
private async Task Disconnect_with_Reason(TestEnvironmentCollection testEnvironments)
185+
public async Task Disconnect_with_Reason()
199186
{
187+
using var testEnvironments = CreateMixedTestEnvironment();
200188
foreach (var testEnvironment in testEnvironments)
201189
{
202190
var disconnectReasonTaskSource = new TaskCompletionSource<MqttClientDisconnectReason>();

Source/MQTTnet.Tests/Mockups/AspNetCoreTestEnvironment.cs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Microsoft.Extensions.Hosting;
99
using Microsoft.VisualStudio.TestTools.UnitTesting;
1010
using MQTTnet.AspNetCore;
11+
using MQTTnet.Diagnostics.Logger;
1112
using MQTTnet.Formatter;
1213
using MQTTnet.Internal;
1314
using MQTTnet.LowLevelClient;
@@ -47,8 +48,11 @@ protected override ILowLevelMqttClient CreateLowLevelClientCore()
4748
private IMqttClientFactory CreateClientFactory()
4849
{
4950
var services = new ServiceCollection();
50-
var clientBuilder = services.AddMqttClient();
51-
UseMqttLogger(clientBuilder, "[CLIENT]=>");
51+
52+
var logger = EnableLogger ? (IMqttNetLogger)ClientLogger : MqttNetNullLogger.Instance;
53+
services.AddSingleton(logger);
54+
services.AddMqttClient();
55+
5256
return services.BuildServiceProvider().GetRequiredService<IMqttClientFactory>();
5357
}
5458

@@ -67,6 +71,7 @@ public override Task<MqttServer> StartServer(Action<MqttServerOptionsBuilder> co
6771
public override Task<MqttServer> StartServer(MqttServerOptionsBuilder optionsBuilder)
6872
{
6973
optionsBuilder.WithDefaultEndpoint();
74+
optionsBuilder.WithDefaultEndpointPort(ServerPort);
7075
optionsBuilder.WithMaxPendingMessagesPerClient(int.MaxValue);
7176
var serverOptions = optionsBuilder.Build();
7277
return StartServer(serverOptions);
@@ -88,14 +93,14 @@ private async Task<MqttServer> StartServer(MqttServerOptions serverOptions)
8893
var appBuilder = WebApplication.CreateBuilder();
8994
appBuilder.Services.AddSingleton(serverOptions);
9095

91-
var serverBuilder = appBuilder.Services.AddMqttServer();
92-
UseMqttLogger(serverBuilder, "[SERVER]=>");
96+
var logger = EnableLogger ? (IMqttNetLogger)ServerLogger : new MqttNetNullLogger();
97+
appBuilder.Services.AddSingleton(logger);
98+
appBuilder.Services.AddMqttServer();
9399

94100
appBuilder.WebHost.UseKestrel(k => k.ListenMqtt());
95101
appBuilder.Host.ConfigureHostOptions(h => h.ShutdownTimeout = TimeSpan.FromMilliseconds(500d));
96102

97103
_app = appBuilder.Build();
98-
await _app.StartAsync();
99104

100105
Server = _app.Services.GetRequiredService<MqttServer>();
101106
ServerPort = serverOptions.DefaultEndpointOptions.Port;
@@ -115,6 +120,12 @@ private async Task<MqttServer> StartServer(MqttServerOptions serverOptions)
115120
return CompletedTask.Instance;
116121
};
117122

123+
var appStartedSource = new TaskCompletionSource();
124+
_app.Lifetime.ApplicationStarted.Register(() => appStartedSource.TrySetResult());
125+
126+
await _app.StartAsync();
127+
await appStartedSource.Task;
128+
118129
return Server;
119130
}
120131

@@ -132,18 +143,6 @@ private static int GetServerPort()
132143
return port;
133144
}
134145

135-
private void UseMqttLogger(IMqttBuilder builder, string categoryNamePrefix)
136-
{
137-
if (EnableLogger)
138-
{
139-
builder.UseAspNetCoreMqttNetLogger(l => l.CategoryNamePrefix = categoryNamePrefix);
140-
}
141-
else
142-
{
143-
builder.UseMqttNetNullLogger();
144-
}
145-
}
146-
147146
public override void Dispose()
148147
{
149148
base.Dispose();

Source/MQTTnet.Tests/Server/Keep_Alive_Tests.cs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,9 @@ namespace MQTTnet.Tests.Server
1717
public sealed class KeepAlive_Tests : BaseTestClass
1818
{
1919
[TestMethod]
20-
public async Task Disconnect_Client_DueTo_KeepAlive_MQTTnet()
21-
{
22-
using var testEnvironments = CreateMQTTnetTestEnvironment();
23-
await Disconnect_Client_DueTo_KeepAlive(testEnvironments);
24-
}
25-
26-
[TestMethod]
27-
public async Task Disconnect_Client_DueTo_KeepAlive_AspNetCore()
28-
{
29-
using var testEnvironments = CreateAspNetCoreTestEnvironment();
30-
await Disconnect_Client_DueTo_KeepAlive(testEnvironments);
31-
}
32-
33-
private async Task Disconnect_Client_DueTo_KeepAlive(TestEnvironmentCollection testEnvironments)
20+
public async Task Disconnect_Client_DueTo_KeepAlive()
3421
{
22+
using var testEnvironments = CreateMixedTestEnvironment();
3523
foreach (var testEnvironment in testEnvironments)
3624
{
3725
await testEnvironment.StartServer();

Source/MQTTnet.Tests/Server/Session_Tests.cs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -228,22 +228,9 @@ public async Task Retry_If_Not_PubAck(MqttQualityOfServiceLevel qos)
228228
}
229229

230230
[TestMethod]
231-
public async Task Session_Takeover_MQTTnet()
232-
{
233-
using var testEnvironments = CreateMQTTnetTestEnvironment();
234-
await Session_Takeover(testEnvironments);
235-
}
236-
237-
[TestMethod]
238-
public async Task Session_Takeover_AspNetCore()
239-
{
240-
using var testEnvironments = CreateAspNetCoreTestEnvironment();
241-
await Session_Takeover(testEnvironments);
242-
}
243-
244-
245-
private async Task Session_Takeover(TestEnvironmentCollection testEnvironments)
231+
public async Task Session_Takeover()
246232
{
233+
using var testEnvironments = CreateMixedTestEnvironment();
247234
foreach (var testEnvironment in testEnvironments)
248235
{
249236
await testEnvironment.StartServer();

0 commit comments

Comments
 (0)