Skip to content

Commit 36cc417

Browse files
author
Rajeev Vokkarne
committed
Merge with master
2 parents b30c058 + f35e72a commit 36cc417

File tree

7 files changed

+167
-45
lines changed

7 files changed

+167
-45
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ For simple markdown files, we accept documentation pull requests submitted again
2525
If your PR is about future changes or has changes to the comments in the code itself, we'll treat is as a code change (see the next section).
2626

2727
# Contribute code
28-
Unlike documentation, we require pull-requests for code to be submitted against the `develop` branch in order to review and run it in our gated build system. We try to maintain a high bar
29-
for code quality and maintainability, we insist on having tests associated with the code, and if necessary, additions/modifications to the requirement documents.
28+
Pull-requests for code to be submitted against the `master` branch. We will review the request and once approved we will be running it in our gated build system. We try to maintain a high bar for code quality and maintainability, we insist on having tests associated with the code, and if necessary, additions/modifications to the requirement documents.
3029

3130
Also, have you signed the [Contribution License Agreement](https://cla.microsoft.com/) ([CLA](https://cla.microsoft.com/))? A friendly bot will remind you about it when you submit your pull-request.
3231

device/Microsoft.Azure.Devices.Client/Transport/AmqpTransportHandler.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ internal AmqpTransportHandler(
4343
Func<MethodRequestInternal, Task> onMethodCallback = null)
4444
:base(context, transportSettings)
4545
{
46-
if (onLinkClosedCallback == null)
47-
{
48-
throw new InvalidOperationException("onLinkClosedCallback is null in AmqpTransportHandler");
49-
}
5046
this.linkClosedListener = onLinkClosedCallback;
5147

5248
TransportType transportType = transportSettings.GetTransportType();

device/Microsoft.Azure.Devices.Client/Transport/Mqtt/MqttTransportHandler.cs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -170,28 +170,36 @@ internal enum TransportState
170170
const string twinResponseTopicPattern = @"\$iothub/twin/res/(\d+)/(\?.+)";
171171
Regex twinResponseTopicRegex = new Regex(twinResponseTopicPattern, RegexOptions.None);
172172

173+
Action<object, EventArgs> connectionClosedListener;
173174
Func<MethodRequestInternal, Task> messageListener;
174175
Action<TwinCollection> onReportedStatePatchListener;
175176
Action<Message> twinResponseEvent;
176177

177178
public TimeSpan TwinTimeout = TimeSpan.FromSeconds(60);
178-
179-
internal MqttTransportHandler(IPipelineContext context, IotHubConnectionString iotHubConnectionString)
180-
: this(context, iotHubConnectionString, new MqttTransportSettings(TransportType.Mqtt_Tcp_Only))
181-
{
182-
183-
}
184179

185-
internal MqttTransportHandler(IPipelineContext context, IotHubConnectionString iotHubConnectionString, MqttTransportSettings settings, Func<MethodRequestInternal, Task> onMethodCallback = null, Action<TwinCollection> onReportedStatePatchReceivedCallback = null)
186-
: this(context, iotHubConnectionString, settings, null)
180+
internal MqttTransportHandler(
181+
IPipelineContext context,
182+
IotHubConnectionString iotHubConnectionString,
183+
MqttTransportSettings settings,
184+
Action<object, EventArgs> onConnectionClosedCallback,
185+
Func<MethodRequestInternal, Task> onMethodCallback = null,
186+
Action<TwinCollection> onReportedStatePatchReceivedCallback = null)
187+
: this(context, iotHubConnectionString, settings, null, onConnectionClosedCallback)
187188
{
188189
this.messageListener = onMethodCallback;
189190
this.onReportedStatePatchListener = onReportedStatePatchReceivedCallback;
190191
}
191192

192-
internal MqttTransportHandler(IPipelineContext context, IotHubConnectionString iotHubConnectionString, MqttTransportSettings settings, Func<IPAddress, int, Task<IChannel>> channelFactory)
193+
internal MqttTransportHandler(
194+
IPipelineContext context,
195+
IotHubConnectionString iotHubConnectionString,
196+
MqttTransportSettings settings,
197+
Func<IPAddress, int, Task<IChannel>> channelFactory,
198+
Action<object, EventArgs> onConnectionClosedCallback)
193199
: base(context, settings)
194200
{
201+
this.connectionClosedListener = onConnectionClosedCallback;
202+
195203
this.mqttIotHubAdapterFactory = new MqttIotHubAdapterFactory(settings);
196204
this.messageQueue = new ConcurrentQueue<Message>();
197205
this.completionQueue = new Queue<string>();
@@ -462,7 +470,7 @@ internal void OnMessageReceived(Message message)
462470
}
463471
}
464472

465-
async void OnError(Exception exception)
473+
internal async void OnError(Exception exception)
466474
{
467475
try
468476
{
@@ -490,15 +498,32 @@ async void OnError(Exception exception)
490498
default:
491499
throw new ArgumentOutOfRangeException();
492500
}
493-
494501
await this.closeRetryPolicy.ExecuteAsync(this.CleanupAsync);
502+
503+
// Codes_SRS_CSHARP_MQTT_TRANSPORT_28_04: If OnError is triggered after OpenAsync is called, onConnectionClosedCallback shall be invoked.
504+
// Codes_SRS_CSHARP_MQTT_TRANSPORT_28_05: If OnError is triggered after ReceiveAsync is called, onConnectionClosedCallback shall be invoked.
505+
// Codes_SRS_CSHARP_MQTT_TRANSPORT_28_06: If OnError is triggered without any prior operation, onConnectionClosedCallback shall not be invoked.
506+
// Codes_SRS_CSHARP_MQTT_TRANSPORT_28_07: If OnError is triggered in error state, onConnectionClosedCallback shall not be invoked.
507+
508+
if ((previousState & TransportState.Open) == TransportState.Open)
509+
{
510+
this.connectionClosedListener(this, null);
511+
}
512+
495513
}
496514
catch (Exception ex) when (!ex.IsFatal())
497515
{
498516

499517
}
500518
}
501519

520+
public override Task RecoverConnections(object link, CancellationToken cancellationToken)
521+
{
522+
// Codes_SRS_CSHARP_MQTT_TRANSPORT_28_08: [** `RecoverConnections` shall throw IotHubClientException exception when in error state.
523+
this.EnsureValidState();
524+
return Common.TaskConstants.Completed;
525+
}
526+
502527
TransportState MoveToStateIfPossible(TransportState destination, TransportState illegalStates)
503528
{
504529
TransportState previousState = this.State;

device/Microsoft.Azure.Devices.Client/Transport/TransportHandlerFactory.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public IDelegatingHandler Create(IPipelineContext context)
3333
#if !NETMF && !PCL
3434
case TransportType.Mqtt_Tcp_Only:
3535
case TransportType.Mqtt_WebSocket_Only:
36-
return new MqttTransportHandler(context, connectionString, transportSetting as MqttTransportSettings, new Func<MethodRequestInternal, Task>(onMethodCallback), onReportedStatePatchReceived);
36+
return new MqttTransportHandler(
37+
context, connectionString, transportSetting as MqttTransportSettings,
38+
new Action<object, EventArgs>(OnConnectionClosedCallback),
39+
new Func<MethodRequestInternal, Task>(onMethodCallback), onReportedStatePatchReceived);
3740
#endif
3841
default:
3942
throw new InvalidOperationException("Unsupported Transport Setting {0}".FormatInvariant(transportSetting));

device/devdoc/requirements/mqtt_transport_requirements.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,22 @@ The Mqtt Transport is used to communicate between the DeviceClient object and th
1212

1313
sealed class MqttTransportHandler : TransportHandler
1414
{
15+
internal MqttTransportHandler(
16+
IPipelineContext context,
17+
IotHubConnectionString iotHubConnectionString,
18+
MqttTransportSettings settings,
19+
Action<object, EventArgs> onConnectionClosedCallback,
20+
Func<MethodRequestInternal, Task> onMethodCallback = null,
21+
Action<TwinCollection> onReportedStatePatchReceivedCallback = null)
22+
1523
public static MqttTransportHandler Create(string hostname, IAuthenticationMethod authMethod);
1624
public static MqttTransportHandler CreateFromConnectionString(string connectionString);
1725
public override async Task OpenAsync(bool explicitOpen, CancellationToken cancellationToken);
1826
public override async Task SendEventAsync(Message message, CancellationToken cancellationToken);
1927
public override async Task SendEventAsync(IEnumerable<Message> messages, CancellationToken cancellationToken);
2028
public override async Task<Message> ReceiveAsync(TimeSpan timeout, CancellationToken cancellationToken);
2129
public override async Task CompleteAsync(string lockToken, CancellationToken cancellationToken);
30+
public override Task RecoverConnections(object link, CancellationToken cancellationToken);
2231
public override Task AbandonAsync(string lockToken, CancellationToken cancellationToken);
2332
public override Task RejectAsync(string lockToken, CancellationToken cancellationToken);
2433
public override async Task CloseAsync();
@@ -34,13 +43,40 @@ sealed class MqttTransportHandler : TransportHandler
3443

3544
```
3645

46+
### MqttTransportHandler
47+
```csharp
48+
internal MqttTransportHandler(
49+
IPipelineContext context,
50+
IotHubConnectionString iotHubConnectionString,
51+
MqttTransportSettings settings,
52+
Action<object, EventArgs> onConnectionClosedCallback,
53+
Func<MethodRequestInternal, Task> onMethodCallback = null,
54+
Action<TwinCollection> onReportedStatePatchReceivedCallback = null)
55+
```
56+
**SRS_CSHARP_MQTT_TRANSPORT_28_04: [** If OnError is triggered after OpenAsync is called, onConnectionClosedCallback shall be invoked. **]**
57+
58+
**SRS_CSHARP_MQTT_TRANSPORT_28_05: [** If OnError is triggered after ReceiveAsync is called, onConnectionClosedCallback shall be invoked. **]**
59+
60+
**SRS_CSHARP_MQTT_TRANSPORT_28_06: [** If OnError is triggered without any prior operation, onConnectionClosedCallback shall not be invoked. **]**
61+
62+
**SRS_CSHARP_MQTT_TRANSPORT_28_07: [** If OnError is triggered in error state, onConnectionClosedCallback shall not be invoked. **]**
63+
64+
65+
3766
### OpenAsync
3867
```csharp
3968
public override async Task OpenAsync(bool explicitOpen, CancellationToken cancellationToken);
4069
```
4170
**SRS_CSHARP_MQTT_TRANSPORT_18_031: [** `OpenAsync` shall subscribe using the '$iothub/twin/res/#' topic filter **]**
4271

4372

73+
## RecoverConnections
74+
```csharp
75+
public override Task RecoverConnections(object link, CancellationToken cancellationToken);
76+
```
77+
**SRS_CSHARP_MQTT_TRANSPORT_28_08: [** `RecoverConnections` shall throw IotHubClientException exception when in error state. **]**
78+
79+
4480

4581
### EnableMethodsAsync
4682
```csharp

device/tests/Microsoft.Azure.Devices.Client.Test/MqttTransportHandlerTests.cs

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ namespace Microsoft.Azure.Devices.Client.Test.Transport.Mqtt
1515
using DotNetty.Codecs.Mqtt.Packets;
1616
using Newtonsoft.Json;
1717
using System.IO;
18+
using Exceptions;
1819

1920
[TestClass]
2021
public class MqttTransportHandlerTests
@@ -101,7 +102,7 @@ public async Task MqttTransportHandler_SendTwinPatchAsync_TokenCancellationReque
101102

102103
MqttTransportHandler CreateFromConnectionString()
103104
{
104-
return new MqttTransportHandler(new PipelineContext(), IotHubConnectionString.Parse(DumpyConnectionString), new MqttTransportSettings(Microsoft.Azure.Devices.Client.TransportType.Mqtt_Tcp_Only));
105+
return new MqttTransportHandler(new PipelineContext(), IotHubConnectionString.Parse(DumpyConnectionString), new MqttTransportSettings(Microsoft.Azure.Devices.Client.TransportType.Mqtt_Tcp_Only), (o, ea) => { });
105106
}
106107

107108
async Task TestOperationCanceledByToken(Func<CancellationToken, Task> asyncMethod)
@@ -120,6 +121,11 @@ async Task TestOperationCanceledByToken(Func<CancellationToken, Task> asyncMetho
120121
}
121122

122123
MqttTransportHandler CreateTransportHandlerWithMockChannel(out IChannel channel)
124+
{
125+
return CreateTransportHandlerWithMockChannel(out channel, (o, ea) => { });
126+
}
127+
128+
MqttTransportHandler CreateTransportHandlerWithMockChannel(out IChannel channel, Action<object, EventArgs> onConnectionClosedCallback)
123129
{
124130
var _channel = Substitute.For<IChannel>();
125131
channel = _channel;
@@ -132,8 +138,8 @@ MqttTransportHandler CreateTransportHandlerWithMockChannel(out IChannel channel)
132138
transport.OnConnected();
133139
return Task<IChannel>.FromResult<IChannel>(_channel);
134140
};
135-
136-
transport = new MqttTransportHandler(new PipelineContext(), IotHubConnectionString.Parse(DumpyConnectionString), new MqttTransportSettings(Microsoft.Azure.Devices.Client.TransportType.Mqtt_Tcp_Only), factory);
141+
142+
transport = new MqttTransportHandler(new PipelineContext(), IotHubConnectionString.Parse(DumpyConnectionString), new MqttTransportSettings(Microsoft.Azure.Devices.Client.TransportType.Mqtt_Tcp_Only), factory, onConnectionClosedCallback);
137143
return transport;
138144
}
139145

@@ -488,7 +494,88 @@ public async Task MqttTransportHandler_SendTwinPatchAsync_TimesOut()
488494
await transport.SendTwinPatchAsync(props, CancellationToken.None);
489495
}
490496

497+
// Tests_SRS_CSHARP_MQTT_TRANSPORT_28_04: If OnError is triggered after OpenAsync is called, onConnectionClosedCallback shall be invoked.
498+
public async Task MqttTransportHandler_OnError_CallConnectionClosedListener_Open()
499+
{
500+
// arrange
501+
IChannel channel;
502+
bool isCalled = false;
503+
var transport = CreateTransportHandlerWithMockChannel(out channel, (o, ea) => { isCalled = true; });
504+
transport.OnConnected();
505+
await transport.OpenAsync(true, CancellationToken.None);
506+
507+
// act
508+
transport.OnError(new ApplicationException("Testing"));
509+
510+
// assert
511+
Assert.IsTrue(isCalled);
512+
}
513+
514+
// Tests_SRS_CSHARP_MQTT_TRANSPORT_28_05: If OnError is triggered after ReceiveAsync is called, onConnectionClosedCallback shall be invoked.
515+
[TestMethod]
516+
public async Task MqttTransportHandler_OnError_CallConnectionClosedListener_Receiving()
517+
{
518+
// arrange
519+
IChannel channel;
520+
bool isCalled = false;
521+
var transport = CreateTransportHandlerWithMockChannel(out channel, (o, ea) => { isCalled = true; });
522+
transport.OnConnected();
523+
await transport.OpenAsync(true, CancellationToken.None);
524+
await transport.ReceiveAsync(new TimeSpan(0, 0, 0, 0, 5), CancellationToken.None);
525+
526+
// act
527+
transport.OnError(new ApplicationException("Testing"));
491528

529+
// assert
530+
Assert.IsTrue(isCalled);
531+
}
532+
533+
// Tests_SRS_CSHARP_MQTT_TRANSPORT_28_06: If OnError is triggered without any prior operation, onConnectionClosedCallback shall not be invoked.
534+
[TestMethod]
535+
public async Task MqttTransportHandler_OnError_CallConnectionClosedListener_NotInitialized()
536+
{
537+
// arrange
538+
IChannel channel;
539+
bool isCalled = false;
540+
var transport = CreateTransportHandlerWithMockChannel(out channel, (o, ea) => { isCalled = true; });
541+
542+
// act
543+
transport.OnError(new ApplicationException("Testing"));
544+
545+
// assert
546+
Assert.IsFalse(isCalled);
547+
}
548+
549+
// Tests_SRS_CSHARP_MQTT_TRANSPORT_28_07: If OnError is triggered in error state, onConnectionClosedCallback shall not be invoked.
550+
[TestMethod]
551+
public async Task MqttTransportHandler_OnError_CallConnectionClosedListener_Error()
552+
{
553+
// arrange
554+
IChannel channel;
555+
bool isCalled = false;
556+
var transport = CreateTransportHandlerWithMockChannel(out channel, (o, ea) => { isCalled = true; });
557+
transport.OnError(new ApplicationException("Testing"));
558+
559+
// act
560+
transport.OnError(new ApplicationException("Testing"));
561+
562+
// assert
563+
Assert.IsFalse(isCalled);
564+
}
565+
566+
// Tests_SRS_CSHARP_MQTT_TRANSPORT_28_08: [** `RecoverConnections` shall throw IotHubClientException exception when in error state.
567+
[TestMethod]
568+
[ExpectedException(typeof(IotHubClientException))]
569+
public async Task MqttTransportHandler_RecoverConnections_throw()
570+
{
571+
// arrange
572+
IChannel channel;
573+
var transport = CreateTransportHandlerWithMockChannel(out channel);
574+
transport.OnError(new ApplicationException("Testing"));
575+
576+
// act
577+
await transport.RecoverConnections(null, CancellationToken.None);
578+
}
492579
}
493580

494581
}

doc/faq.md

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -109,30 +109,6 @@ UWP uses the Windows Store resource model that replaces the hub-and-spoke model
109109

110110
To support resources in Microsoft.Azure.Devices.Client library, the existing Resource.resx file has been copied to Resource.resw. The two files will now need to be kept in sync. Unlike in the .NET version of the library, the UWP version does not contain generated C# files. Instead, a new file, UWPResources.cs is introduced. Whenever a new string is added to the .resx/.resw file, a corresponding entry must be copied from Resources.Designer.cs to UWPResources.cs (follow the existing entries as an example)
111111

112-
<a name="notimpluwp"/>
113-
## System.NotImplementedException occurred in Microsoft.Azure.Devices.Client.winmd
114-
The Universal Windows Platform (UWP) version of the .NET client device library does **not** currently support **MQTT** protocol.
115-
116-
For example, calling `DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(DeviceConnectionString, TransportType.Mqtt);`
117-
will result in "Mqtt protocol is not supported" exception.
118-
119-
<a name="httpexception"/>
120-
## IotHubCommunicationException or FileNotFoundException thrown when using HTTP protocol
121-
122-
The **DeviceClient** class in the Microsoft.Azure.Devices.Client package requires the **System.Net.Http.Formatting** class to communicate with IoT Hub over HTTP.
123-
124-
You see an **IotHubCommunicationException** or **FileNotFoundException** exception when you try to use the HTTP protocol to send a device-to-cloud message:
125-
126-
```
127-
DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(DeviceConnectionString, TransportType.Http1);
128-
129-
...
130-
131-
await deviceClient.SendEventAsync(eventMessage);
132-
```
133-
134-
To prevent these exceptions from happening, you should add **Microsoft.AspNet.WebApi.Client** NuGet package to your project.
135-
136112
<a name="javapi2error"/>
137113
## Error when using AMQP on Raspberry Pi2
138114
When building Qpid using Maven on a Raspberry Pi2 you might encounter a known error with the *SureFire* plugin:

0 commit comments

Comments
 (0)