Skip to content

Commit e0a066c

Browse files
progress
1 parent eb1eb79 commit e0a066c

File tree

6 files changed

+113
-134
lines changed

6 files changed

+113
-134
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Azure.Iot.Operations.Protocol;
5+
using Azure.Iot.Operations.Protocol.Connection;
6+
using Azure.Iot.Operations.Protocol.Models;
7+
using IMqttClient = MQTTnet.IMqttClient;
8+
9+
namespace Azure.Iot.Operations.Mqtt;
10+
11+
public class ExtendedPubSubMqttClient(IMqttClient mqttNetClient, OrderedAckMqttClientOptions? clientOptions = null)
12+
: OrderedAckMqttClient(mqttNetClient, clientOptions), IExtendedPubSubMqttClient
13+
{
14+
private MqttClientConnectResult? _connectResult;
15+
16+
public override async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions options, CancellationToken cancellationToken = default)
17+
{
18+
var connectResult = await base.ConnectAsync(options, cancellationToken);
19+
_connectResult = connectResult;
20+
return connectResult;
21+
}
22+
23+
public override async Task<MqttClientConnectResult> ConnectAsync(MqttConnectionSettings settings, CancellationToken cancellationToken = default)
24+
{
25+
var connectResult = await base.ConnectAsync(settings, cancellationToken);
26+
_connectResult = connectResult;
27+
return connectResult;
28+
}
29+
30+
public MqttClientConnectResult? GetConnectResult()
31+
{
32+
return _connectResult;
33+
}
34+
}

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingMqttClient.cs renamed to dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingMqttPubSubClient.cs

Lines changed: 9 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
using Azure.Iot.Operations.Protocol.Connection;
54
using Azure.Iot.Operations.Protocol.Events;
65
using Azure.Iot.Operations.Protocol.Models;
76
using System;
@@ -17,78 +16,32 @@ namespace Azure.Iot.Operations.Protocol.Chunking;
1716
/// <summary>
1817
/// MQTT client middleware that provides transparent chunking of large messages.
1918
/// </summary>
20-
public class ChunkingMqttClient : IMqttClient
19+
public class ChunkingMqttPubSubClient : IMqttPubSubClient
2120
{
22-
private readonly IMqttClient _innerClient;
21+
private readonly IExtendedPubSubMqttClient _innerClient;
2322
private readonly ChunkingOptions _chunkingOptions;
2423
private readonly ConcurrentDictionary<string, ChunkedMessageAssembler> _messageAssemblers = new();
2524
private readonly ChunkedMessageSplitter _messageSplitter;
2625
private int _maxPacketSize;
2726

2827
/// <summary>
29-
/// Initializes a new instance of the <see cref="ChunkingMqttClient"/> class.
28+
/// Initializes a new instance of the <see cref="ChunkingMqttPubSubClient"/> class.
3029
/// </summary>
3130
/// <param name="innerClient">The MQTT client to wrap with chunking capabilities.</param>
3231
/// <param name="options">The chunking options.</param>
33-
public ChunkingMqttClient(IMqttClient innerClient, ChunkingOptions? options = null)
32+
public ChunkingMqttPubSubClient(IExtendedPubSubMqttClient innerClient, ChunkingOptions? options = null)
3433
{
3534
_innerClient = innerClient ?? throw new ArgumentNullException(nameof(innerClient));
3635
_chunkingOptions = options ?? new ChunkingOptions();
3736
_messageSplitter = new ChunkedMessageSplitter(_chunkingOptions);
3837

39-
// Hook into the inner client's event
38+
UpdateMaxPacketSizeFromConnectResult(_innerClient.GetConnectResult());
39+
4040
_innerClient.ApplicationMessageReceivedAsync += HandleApplicationMessageReceivedAsync;
41-
_innerClient.ConnectedAsync += HandleConnectedAsync;
42-
_innerClient.DisconnectedAsync += HandleDisconnectedAsync;
4341
}
4442

45-
/// <inheritdoc/>
4643
public event Func<MqttApplicationMessageReceivedEventArgs, Task>? ApplicationMessageReceivedAsync;
4744

48-
/// <inheritdoc/>
49-
public event Func<MqttClientDisconnectedEventArgs, Task>? DisconnectedAsync;
50-
51-
/// <inheritdoc/>
52-
public event Func<MqttClientConnectedEventArgs, Task>? ConnectedAsync;
53-
54-
/// <inheritdoc/>
55-
public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions options, CancellationToken cancellationToken = default)
56-
{
57-
var result = await _innerClient.ConnectAsync(options, cancellationToken);
58-
59-
UpdateMaxPacketSizeFromConnectResult(result);
60-
61-
return result;
62-
}
63-
64-
/// <inheritdoc/>
65-
public async Task<MqttClientConnectResult> ConnectAsync(MqttConnectionSettings settings, CancellationToken cancellationToken = default)
66-
{
67-
var result = await _innerClient.ConnectAsync(settings, cancellationToken);
68-
69-
UpdateMaxPacketSizeFromConnectResult(result);
70-
71-
return result;
72-
}
73-
74-
/// <inheritdoc/>
75-
public Task DisconnectAsync(MqttClientDisconnectOptions? options = null, CancellationToken cancellationToken = default)
76-
{
77-
return _innerClient.DisconnectAsync(options, cancellationToken);
78-
}
79-
80-
public Task ReconnectAsync(CancellationToken cancellationToken = default)
81-
{
82-
return _innerClient.ReconnectAsync(cancellationToken);
83-
}
84-
85-
public bool IsConnected => _innerClient.IsConnected;
86-
87-
public Task SendEnhancedAuthenticationExchangeDataAsync(MqttEnhancedAuthenticationExchangeData data, CancellationToken cancellationToken = default)
88-
{
89-
return _innerClient.SendEnhancedAuthenticationExchangeDataAsync(data, cancellationToken);
90-
}
91-
9245
/// <inheritdoc/>
9346
public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken = default)
9447
{
@@ -130,25 +83,21 @@ public ValueTask DisposeAsync()
13083

13184
// Detach events
13285
_innerClient.ApplicationMessageReceivedAsync -= HandleApplicationMessageReceivedAsync;
133-
_innerClient.ConnectedAsync -= HandleConnectedAsync;
134-
_innerClient.DisconnectedAsync -= HandleDisconnectedAsync;
13586

13687
// Suppress finalization since we're explicitly disposing
13788
GC.SuppressFinalize(this);
13889

13990
return _innerClient.DisposeAsync();
14091
}
14192

142-
private void UpdateMaxPacketSizeFromConnectResult(MqttClientConnectResult result)
93+
private void UpdateMaxPacketSizeFromConnectResult(MqttClientConnectResult? result)
14394
{
144-
if (_chunkingOptions.Enabled && result.MaximumPacketSize is not > 0)
95+
if (_chunkingOptions.Enabled && result?.MaximumPacketSize is not > 0)
14596
{
14697
throw new InvalidOperationException("Chunking client requires a defined maximum packet size to function properly.");
14798
}
14899

149-
// TODO: @maximsemnov80 figure out how to set the max packet size on the broker side
150-
// Interlocked.Exchange(ref _maxPacketSize, (int)result.MaximumPacketSize!.Value);
151-
_maxPacketSize = 64*1024; // 64KB
100+
Interlocked.Exchange(ref _maxPacketSize, (int)result!.MaximumPacketSize!.Value);
152101
}
153102

154103
private async Task<MqttClientPublishResult> PublishChunkedMessageAsync(MqttApplicationMessage message, CancellationToken cancellationToken)
@@ -262,21 +211,4 @@ private static bool TryGetChunkMetadata(MqttApplicationMessage message, out Chun
262211
return false;
263212
}
264213
}
265-
266-
private Task HandleConnectedAsync(MqttClientConnectedEventArgs args)
267-
{
268-
// Forward the event
269-
var handler = ConnectedAsync;
270-
return handler != null ? handler.Invoke(args) : Task.CompletedTask;
271-
}
272-
273-
private Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs args)
274-
{
275-
// Clear any in-progress reassembly when disconnected
276-
_messageAssemblers.Clear();
277-
278-
// Forward the event
279-
var handler = DisconnectedAsync;
280-
return handler != null ? handler.Invoke(args) : Task.CompletedTask;
281-
}
282214
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Azure.Iot.Operations.Protocol.Models;
5+
6+
namespace Azure.Iot.Operations.Protocol;
7+
8+
public interface IExtendedPubSubMqttClient : IMqttPubSubClient
9+
{
10+
MqttClientConnectResult? GetConnectResult();
11+
}

dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/Chunking/ChunkingMqttClientIntegrationTests.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public async Task ChunkingMqttClient_SmallMessage_NoChunking()
2323
{
2424
// Arrange
2525
// Create a base client
26-
var baseClient = await ClientFactory.CreateClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
26+
await using var mqttClient = await ClientFactory.CreateExtendedClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
2727

2828
// Create a chunking client with modest settings
2929
var options = new ChunkingOptions
@@ -33,7 +33,7 @@ public async Task ChunkingMqttClient_SmallMessage_NoChunking()
3333
ChunkTimeout = TimeSpan.FromSeconds(10)
3434
};
3535

36-
await using var chunkingClient = new ChunkingMqttClient(baseClient, options);
36+
await using var chunkingClient = new ChunkingMqttPubSubClient(mqttClient, options);
3737

3838
var messageReceivedTcs = new TaskCompletionSource<MqttApplicationMessage>();
3939
chunkingClient.ApplicationMessageReceivedAsync += (args) =>
@@ -87,16 +87,14 @@ public async Task ChunkingMqttClient_SmallMessage_NoChunking()
8787
var testProperty = receivedMessage.UserProperties?.FirstOrDefault(p => p.Name == "testProperty");
8888
Assert.NotNull(testProperty);
8989
Assert.Equal("testValue", testProperty!.Value);
90-
91-
await chunkingClient.DisconnectAsync();
9290
}
9391

9492
[Fact]
9593
public async Task ChunkingMqttClient_LargeMessage_ChunkingAndReassembly()
9694
{
9795
// Arrange
9896
// Create a base client
99-
var baseClient = await ClientFactory.CreateClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
97+
await using var mqttClient = await ClientFactory.CreateExtendedClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
10098

10199
// Create a chunking client with settings that force chunking
102100
var options = new ChunkingOptions
@@ -106,7 +104,7 @@ public async Task ChunkingMqttClient_LargeMessage_ChunkingAndReassembly()
106104
ChunkTimeout = TimeSpan.FromSeconds(30)
107105
};
108106

109-
await using var chunkingClient = new ChunkingMqttClient(baseClient, options);
107+
await using var chunkingClient = new ChunkingMqttPubSubClient(mqttClient, options);
110108

111109
var messageReceivedTcs = new TaskCompletionSource<MqttApplicationMessage>();
112110
chunkingClient.ApplicationMessageReceivedAsync += (args) =>
@@ -172,8 +170,6 @@ public async Task ChunkingMqttClient_LargeMessage_ChunkingAndReassembly()
172170
var testProperty = receivedMessage.UserProperties?.FirstOrDefault(p => p.Name == "testProperty");
173171
Assert.NotNull(testProperty);
174172
Assert.Equal("testValue", testProperty!.Value);
175-
176-
await chunkingClient.DisconnectAsync();
177173
}
178174

179175
/*

dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/ClientFactory.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,29 @@ public static async Task<OrderedAckMqttClient> CreateClientAsyncFromEnvAsync(str
3535
return orderedAckClient;
3636
}
3737

38+
public static async Task<ExtendedPubSubMqttClient> CreateExtendedClientAsyncFromEnvAsync(string clientId, bool withTraces = false, CancellationToken cancellationToken = default)
39+
{
40+
Debug.Assert(Environment.GetEnvironmentVariable("MQTT_TEST_BROKER_CS") != null);
41+
string cs = $"{Environment.GetEnvironmentVariable("MQTT_TEST_BROKER_CS")}";
42+
MqttConnectionSettings mcs = MqttConnectionSettings.FromConnectionString(cs);
43+
if (string.IsNullOrEmpty(clientId))
44+
{
45+
mcs.ClientId += Guid.NewGuid();
46+
}
47+
else
48+
{
49+
mcs.ClientId = clientId;
50+
}
51+
52+
MQTTnet.IMqttClient mqttClient = withTraces
53+
? new MQTTnet.MqttClientFactory().CreateMqttClient(MqttNetTraceLogger.CreateTraceLogger())
54+
: new MQTTnet.MqttClientFactory().CreateMqttClient();
55+
var extendedPubSubClient = new ExtendedPubSubMqttClient(mqttClient);
56+
await extendedPubSubClient.ConnectAsync(new MqttClientOptions(mcs), cancellationToken);
57+
58+
return extendedPubSubClient;
59+
}
60+
3861
public static async Task<MqttSessionClient> CreateSessionClientForFaultableBrokerFromEnv(List<MqttUserProperty>? ConnectUserProperties = null, string? clientId = null)
3962
{
4063
if (string.IsNullOrEmpty(clientId))

0 commit comments

Comments
 (0)