Skip to content

Commit 1046bd8

Browse files
progress
1 parent cf3a714 commit 1046bd8

File tree

3 files changed

+33
-18
lines changed

3 files changed

+33
-18
lines changed

dotnet/src/Azure.Iot.Operations.Protocol/AssemblyInfo.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
using System.Runtime.CompilerServices;
55

6-
[assembly: InternalsVisibleTo("Azure.Iot.Operations.Protocol.IntegrationTests")]
6+
// TODO: @maximsemenov80 it looks like assembly shoulb be signed for the integration tests to work
7+
//[assembly: InternalsVisibleTo("Azure.Iot.Operations.Protocol.IntegrationTests")]
78

89
namespace Azure.Iot.Operations.Protocol;
910

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingConstants.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ namespace Azure.Iot.Operations.Protocol.Chunking;
66
/// <summary>
77
/// Constants used for the MQTT message chunking feature.
88
/// </summary>
9-
internal static class ChunkingConstants
9+
//TODO: @maximsemenov80 public for testing purposes, should be internal
10+
public static class ChunkingConstants
1011
{
1112
/// <summary>
1213
/// The user property name used to store chunking metadata.

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingMqttClient.cs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace Azure.Iot.Operations.Protocol.Chunking;
2020
public class ChunkingMqttClient : IMqttClient
2121
{
2222
private readonly IMqttClient _innerClient;
23-
private readonly ChunkingOptions _options;
23+
private readonly ChunkingOptions _chunkingOptions;
2424
private readonly ConcurrentDictionary<string, ChunkedMessageAssembler> _messageAssemblers = new();
2525
private readonly ChunkedMessageSplitter _messageSplitter;
2626
private int _maxPacketSize;
@@ -33,8 +33,8 @@ public class ChunkingMqttClient : IMqttClient
3333
public ChunkingMqttClient(IMqttClient innerClient, ChunkingOptions? options = null)
3434
{
3535
_innerClient = innerClient ?? throw new ArgumentNullException(nameof(innerClient));
36-
_options = options ?? new ChunkingOptions();
37-
_messageSplitter = new ChunkedMessageSplitter(_options);
36+
_chunkingOptions = options ?? new ChunkingOptions();
37+
_messageSplitter = new ChunkedMessageSplitter(_chunkingOptions);
3838

3939
// Hook into the inner client's event
4040
_innerClient.ApplicationMessageReceivedAsync += HandleApplicationMessageReceivedAsync;
@@ -52,15 +52,23 @@ public ChunkingMqttClient(IMqttClient innerClient, ChunkingOptions? options = nu
5252
public event Func<MqttClientConnectedEventArgs, Task>? ConnectedAsync;
5353

5454
/// <inheritdoc/>
55-
public Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions options, CancellationToken cancellationToken = default)
55+
public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions options, CancellationToken cancellationToken = default)
5656
{
57-
return _innerClient.ConnectAsync(options, cancellationToken);
57+
var result = await _innerClient.ConnectAsync(options, cancellationToken);
58+
59+
UpdateMaxPacketSizeFromConnectResult(result);
60+
61+
return result;
5862
}
5963

6064
/// <inheritdoc/>
61-
public Task<MqttClientConnectResult> ConnectAsync(MqttConnectionSettings settings, CancellationToken cancellationToken = default)
65+
public async Task<MqttClientConnectResult> ConnectAsync(MqttConnectionSettings settings, CancellationToken cancellationToken = default)
6266
{
63-
return _innerClient.ConnectAsync(settings, cancellationToken);
67+
var result = await _innerClient.ConnectAsync(settings, cancellationToken);
68+
69+
UpdateMaxPacketSizeFromConnectResult(result);
70+
71+
return result;
6472
}
6573

6674
/// <inheritdoc/>
@@ -85,7 +93,7 @@ public Task SendEnhancedAuthenticationExchangeDataAsync(MqttEnhancedAuthenticati
8593
public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken = default)
8694
{
8795
// If chunking is disabled or the message is small enough, pass through to the inner client
88-
if (!_options.Enabled || applicationMessage.Payload.Length <= Utils.GetMaxChunkSize(_maxPacketSize, _options.StaticOverhead))
96+
if (!_chunkingOptions.Enabled || applicationMessage.Payload.Length <= Utils.GetMaxChunkSize(_maxPacketSize, _chunkingOptions.StaticOverhead))
8997
{
9098
return await _innerClient.PublishAsync(applicationMessage, cancellationToken).ConfigureAwait(false);
9199
}
@@ -131,6 +139,18 @@ public ValueTask DisposeAsync()
131139
return _innerClient.DisposeAsync();
132140
}
133141

142+
private void UpdateMaxPacketSizeFromConnectResult(MqttClientConnectResult result)
143+
{
144+
if (_chunkingOptions.Enabled && result.MaximumPacketSize is not > 0)
145+
{
146+
throw new InvalidOperationException("Chunking client requires a defined maximum packet size to function properly.");
147+
}
148+
149+
// TODO: @maximsemnov80 figure out how to set the max packet size on the broker side
150+
// Interlocked.Exchange(ref _maxPacketSize, (int)result.MaximumPacketSize!.Value);
151+
_maxPacketSize = 64*1024; // 64KB
152+
}
153+
134154
private async Task<MqttClientPublishResult> PublishChunkedMessageAsync(MqttApplicationMessage message, CancellationToken cancellationToken)
135155
{
136156
// Use the message splitter to split the message into chunks
@@ -191,7 +211,7 @@ private bool TryProcessChunk(
191211
// Get or create the message assembler
192212
var assembler = _messageAssemblers.GetOrAdd(
193213
metadata.MessageId,
194-
_ => new ChunkedMessageAssembler(metadata.TotalChunks ?? 0, _options.ChecksumAlgorithm));
214+
_ => new ChunkedMessageAssembler(metadata.TotalChunks ?? 0, _chunkingOptions.ChecksumAlgorithm));
195215

196216
// Add this chunk to the assembler
197217
if (assembler.AddChunk(metadata.ChunkIndex, args))
@@ -245,13 +265,6 @@ private static bool TryGetChunkMetadata(MqttApplicationMessage message, out Chun
245265

246266
private Task HandleConnectedAsync(MqttClientConnectedEventArgs args)
247267
{
248-
if (!args.ConnectResult.MaximumPacketSize.HasValue)
249-
{
250-
throw new InvalidOperationException("Chunking client requires a defined maximum packet size to function properly.");
251-
}
252-
253-
Interlocked.Exchange(ref _maxPacketSize, (int)args.ConnectResult.MaximumPacketSize.Value);
254-
255268
// Forward the event
256269
var handler = ConnectedAsync;
257270
return handler != null ? handler.Invoke(args) : Task.CompletedTask;

0 commit comments

Comments
 (0)