Skip to content

Commit b8c69cf

Browse files
progress
1 parent 9ce060c commit b8c69cf

File tree

3 files changed

+106
-83
lines changed

3 files changed

+106
-83
lines changed

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkedMessageSplitter.cs

Lines changed: 70 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,42 @@ public ChunkedMessageSplitter(ChunkingOptions options)
3232
/// <param name="message">The original message to split.</param>
3333
/// <param name="maxPacketSize">The maximum packet size allowed.</param>
3434
/// <returns>A list of chunked messages.</returns>
35-
public IReadOnlyList<MqttApplicationMessage> SplitMessage(MqttApplicationMessage message, int maxPacketSize)
35+
public IReadOnlyList<MqttApplicationMessage> SplitMessage(MqttApplicationMessage message, int maxPacketSize)
36+
{
37+
var maxChunkSize = ValidateAndGetMaxChunkSize(message, maxPacketSize);
38+
var (payload, totalChunks, messageId, checksum, userProperties) = PrepareChunkingMetadata(message, maxChunkSize);
39+
40+
// Create chunks
41+
var chunks = new List<MqttApplicationMessage>(totalChunks);
42+
43+
for (var chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++)
44+
{
45+
var chunkPayload = ChunkedMessageSplitter.ExtractChunkPayload(payload, chunkIndex, maxChunkSize);
46+
var chunkMessage = CreateChunk(message, chunkPayload, userProperties, messageId, chunkIndex, totalChunks, checksum);
47+
chunks.Add(chunkMessage);
48+
}
49+
50+
return chunks;
51+
}
52+
53+
private int ValidateAndGetMaxChunkSize(MqttApplicationMessage message, int maxPacketSize)
3654
{
3755
ArgumentNullException.ThrowIfNull(message);
3856
ArgumentOutOfRangeException.ThrowIfLessThan(maxPacketSize, 128); // minimum MQTT 5.0 protocol compliance.
3957

4058
// Calculate the maximum size for each chunk's payload
41-
var maxChunkSize = GetMaxChunkSize(maxPacketSize);
59+
var maxChunkSize = Utils.GetMaxChunkSize(maxPacketSize, _options.StaticOverhead);
4260
if (message.Payload.Length <= maxChunkSize)
4361
{
4462
throw new ArgumentException($"Message size {message.Payload.Length} is less than the maximum chunk size {maxChunkSize}.", nameof(message));
4563
}
4664

65+
return maxChunkSize;
66+
}
67+
68+
private (ReadOnlySequence<byte> Payload, int TotalChunks, string MessageId, string Checksum, List<MqttUserProperty> UserProperties)
69+
PrepareChunkingMetadata(MqttApplicationMessage message, int maxChunkSize)
70+
{
4771
var payload = message.Payload;
4872
var totalChunks = (int)Math.Ceiling((double)payload.Length / maxChunkSize);
4973

@@ -56,56 +80,53 @@ public IReadOnlyList<MqttApplicationMessage> SplitMessage(MqttApplicationMessage
5680
// Create a copy of the user properties
5781
var userProperties = new List<MqttUserProperty>(message.UserProperties ?? Enumerable.Empty<MqttUserProperty>());
5882

59-
// Create chunks
60-
var chunks = new List<MqttApplicationMessage>(totalChunks);
61-
62-
for (var chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++)
63-
{
64-
// Create chunk metadata
65-
var metadata = chunkIndex == 0
66-
? ChunkMetadata.CreateFirstChunk(messageId, totalChunks, checksum, _options.ChunkTimeout)
67-
: ChunkMetadata.CreateSubsequentChunk(messageId, chunkIndex, _options.ChunkTimeout);
68-
69-
// Serialize the metadata to JSON
70-
var metadataJson = JsonSerializer.Serialize(metadata);
71-
72-
// Create user properties for this chunk
73-
var chunkUserProperties = new List<MqttUserProperty>(userProperties)
74-
{
75-
// Add the chunk metadata property
76-
new(ChunkingConstants.ChunkUserProperty, metadataJson)
77-
};
78-
79-
// Extract the chunk payload
80-
var chunkStart = (long)chunkIndex * maxChunkSize;
81-
var chunkLength = Math.Min(maxChunkSize, payload.Length - chunkStart);
82-
var chunkPayload = payload.Slice(chunkStart, chunkLength);
83-
84-
// Create a message for this chunk
85-
var chunkMessage = new MqttApplicationMessage(message.Topic, message.QualityOfServiceLevel)
86-
{
87-
Retain = message.Retain,
88-
Payload = chunkPayload,
89-
ContentType = message.ContentType,
90-
ResponseTopic = message.ResponseTopic,
91-
CorrelationData = message.CorrelationData,
92-
PayloadFormatIndicator = message.PayloadFormatIndicator,
93-
MessageExpiryInterval = message.MessageExpiryInterval,
94-
TopicAlias = message.TopicAlias,
95-
SubscriptionIdentifiers = message.SubscriptionIdentifiers,
96-
UserProperties = chunkUserProperties
97-
};
98-
99-
chunks.Add(chunkMessage);
100-
}
83+
return (payload, totalChunks, messageId, checksum, userProperties);
84+
}
10185

102-
return chunks;
86+
private static ReadOnlySequence<byte> ExtractChunkPayload(ReadOnlySequence<byte> payload, int chunkIndex, int maxChunkSize)
87+
{
88+
var chunkStart = (long)chunkIndex * maxChunkSize;
89+
var chunkLength = Math.Min(maxChunkSize, payload.Length - chunkStart);
90+
return payload.Slice(chunkStart, chunkLength);
10391
}
10492

105-
private int GetMaxChunkSize(int maxPacketSize)
93+
private MqttApplicationMessage CreateChunk(
94+
MqttApplicationMessage originalMessage,
95+
ReadOnlySequence<byte> chunkPayload,
96+
List<MqttUserProperty> userProperties,
97+
string messageId,
98+
int chunkIndex,
99+
int totalChunks,
100+
string checksum)
106101
{
107-
ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(maxPacketSize, _options.StaticOverhead);
108-
// Subtract the static overhead to ensure we don't exceed the broker's limits
109-
return maxPacketSize - _options.StaticOverhead;
102+
// Create chunk metadata
103+
var metadata = chunkIndex == 0
104+
? ChunkMetadata.CreateFirstChunk(messageId, totalChunks, checksum, _options.ChunkTimeout)
105+
: ChunkMetadata.CreateSubsequentChunk(messageId, chunkIndex, _options.ChunkTimeout);
106+
107+
// Serialize the metadata to JSON
108+
var metadataJson = JsonSerializer.Serialize(metadata);
109+
110+
// Create user properties for this chunk
111+
var chunkUserProperties = new List<MqttUserProperty>(userProperties)
112+
{
113+
// Add the chunk metadata property
114+
new(ChunkingConstants.ChunkUserProperty, metadataJson)
115+
};
116+
117+
// Create a message for this chunk
118+
return new MqttApplicationMessage(originalMessage.Topic, originalMessage.QualityOfServiceLevel)
119+
{
120+
Retain = originalMessage.Retain,
121+
Payload = chunkPayload,
122+
ContentType = originalMessage.ContentType,
123+
ResponseTopic = originalMessage.ResponseTopic,
124+
CorrelationData = originalMessage.CorrelationData,
125+
PayloadFormatIndicator = originalMessage.PayloadFormatIndicator,
126+
MessageExpiryInterval = originalMessage.MessageExpiryInterval,
127+
TopicAlias = originalMessage.TopicAlias,
128+
SubscriptionIdentifiers = originalMessage.SubscriptionIdentifiers,
129+
UserProperties = chunkUserProperties
130+
};
110131
}
111132
}

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingMqttClient.cs

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -52,31 +52,15 @@ public ChunkingMqttClient(IMqttClient innerClient, ChunkingOptions? options = nu
5252
public event Func<MqttClientConnectedEventArgs, Task>? ConnectedAsync;
5353

5454
/// <inheritdoc/>
55-
public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions options, CancellationToken cancellationToken = default)
55+
public Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions options, CancellationToken cancellationToken = default)
5656
{
57-
var result = await _innerClient.ConnectAsync(options, cancellationToken).ConfigureAwait(false);
58-
59-
if (!result.MaximumPacketSize.HasValue)
60-
{
61-
throw new InvalidOperationException("Chunking client requires a defined maximum packet size to function properly.");
62-
}
63-
64-
_maxPacketSize = (int)result.MaximumPacketSize.Value;
65-
return result;
57+
return _innerClient.ConnectAsync(options, cancellationToken);
6658
}
6759

6860
/// <inheritdoc/>
69-
public async Task<MqttClientConnectResult> ConnectAsync(MqttConnectionSettings settings, CancellationToken cancellationToken = default)
61+
public Task<MqttClientConnectResult> ConnectAsync(MqttConnectionSettings settings, CancellationToken cancellationToken = default)
7062
{
71-
var result = await _innerClient.ConnectAsync(settings, cancellationToken).ConfigureAwait(false);
72-
73-
if (!result.MaximumPacketSize.HasValue)
74-
{
75-
throw new InvalidOperationException("Chunking client requires a defined maximum packet size to function properly.");
76-
}
77-
78-
_maxPacketSize = (int)result.MaximumPacketSize;
79-
return result;
63+
return _innerClient.ConnectAsync(settings, cancellationToken);
8064
}
8165

8266
/// <inheritdoc/>
@@ -101,7 +85,7 @@ public Task SendEnhancedAuthenticationExchangeDataAsync(MqttEnhancedAuthenticati
10185
public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken = default)
10286
{
10387
// If chunking is disabled or the message is small enough, pass through to the inner client
104-
if (!_options.Enabled || applicationMessage.Payload.Length <= GetMaxChunkSize())
88+
if (!_options.Enabled || applicationMessage.Payload.Length <= Utils.GetMaxChunkSize(_maxPacketSize, _options.StaticOverhead))
10589
{
10690
return await _innerClient.PublishAsync(applicationMessage, cancellationToken).ConfigureAwait(false);
10791
}
@@ -147,12 +131,6 @@ public ValueTask DisposeAsync()
147131
return _innerClient.DisposeAsync();
148132
}
149133

150-
private int GetMaxChunkSize()
151-
{
152-
// Subtract the static overhead to ensure we don't exceed the broker's limits
153-
return Math.Max(0, _maxPacketSize - _options.StaticOverhead);
154-
}
155-
156134
private async Task<MqttClientPublishResult> PublishChunkedMessageAsync(MqttApplicationMessage message, CancellationToken cancellationToken)
157135
{
158136
// Use the message splitter to split the message into chunks
@@ -175,12 +153,13 @@ private async Task<MqttClientPublishResult> PublishChunkedMessageAsync(MqttAppli
175153
private async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
176154
{
177155
// Check if this is a chunked message
156+
var onApplicationMessageReceivedAsync = ApplicationMessageReceivedAsync;
178157
if (!TryGetChunkMetadata(args.ApplicationMessage, out var chunkMetadata))
179158
{
180159
// Not a chunked message, pass it through
181-
if (ApplicationMessageReceivedAsync != null)
160+
if (onApplicationMessageReceivedAsync != null)
182161
{
183-
await ApplicationMessageReceivedAsync.Invoke(args).ConfigureAwait(false);
162+
await onApplicationMessageReceivedAsync.Invoke(args).ConfigureAwait(false);
184163
}
185164

186165
return;
@@ -190,9 +169,9 @@ private async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageR
190169
if (TryProcessChunk(args, chunkMetadata!, out var reassembledArgs))
191170
{
192171
// We have a complete message, invoke the event
193-
if (ApplicationMessageReceivedAsync != null && reassembledArgs != null)
172+
if (onApplicationMessageReceivedAsync != null && reassembledArgs != null)
194173
{
195-
await ApplicationMessageReceivedAsync.Invoke(reassembledArgs).ConfigureAwait(false);
174+
await onApplicationMessageReceivedAsync.Invoke(reassembledArgs).ConfigureAwait(false);
196175
}
197176
}
198177
else
@@ -271,10 +250,11 @@ private Task HandleConnectedAsync(MqttClientConnectedEventArgs args)
271250
throw new InvalidOperationException("Chunking client requires a defined maximum packet size to function properly.");
272251
}
273252

274-
_maxPacketSize = (int)args.ConnectResult.MaximumPacketSize.Value;
253+
Interlocked.Exchange(ref _maxPacketSize, (int)args.ConnectResult.MaximumPacketSize.Value);
275254

276255
// Forward the event
277-
return ConnectedAsync?.Invoke(args) ?? Task.CompletedTask;
256+
var handler = ConnectedAsync;
257+
return handler != null ? handler.Invoke(args) : Task.CompletedTask;
278258
}
279259

280260
private Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs args)
@@ -283,6 +263,7 @@ private Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs args)
283263
_messageAssemblers.Clear();
284264

285265
// Forward the event
286-
return DisconnectedAsync?.Invoke(args) ?? Task.CompletedTask;
266+
var handler = DisconnectedAsync;
267+
return handler != null ? handler.Invoke(args) : Task.CompletedTask;
287268
}
288269
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
6+
namespace Azure.Iot.Operations.Protocol.Chunking;
7+
8+
public static class Utils
9+
{
10+
/// <summary>
11+
/// Calculates the maximum size for a message chunk based on max packet size and overhead.
12+
/// </summary>
13+
/// <param name="maxPacketSize">The maximum packet size allowed by the broker.</param>
14+
/// <param name="staticOverhead">The static overhead to account for in each chunk.</param>
15+
/// <returns>The maximum size that can be used for a message chunk.</returns>
16+
public static int GetMaxChunkSize(int maxPacketSize, int staticOverhead)
17+
{
18+
ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(maxPacketSize, staticOverhead);
19+
return maxPacketSize - staticOverhead;
20+
}
21+
}

0 commit comments

Comments
 (0)