Skip to content

Commit 9ce060c

Browse files
progress
1 parent e4dd80f commit 9ce060c

File tree

4 files changed

+45
-77
lines changed

4 files changed

+45
-77
lines changed

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingConstants.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,9 @@ internal static class ChunkingConstants
4848
/// This accounts for MQTT packet headers, topic name, and other metadata.
4949
/// </summary>
5050
public const int DefaultStaticOverhead = 1024;
51+
52+
/// <summary>
53+
/// Reason string for successful chunked message transmission.
54+
/// </summary>
55+
public const string ChunkedMessageSuccessReasonString = "Chunked message successfully sent";
5156
}

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingMqttClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private async Task<MqttClientPublishResult> PublishChunkedMessageAsync(MqttAppli
168168
return new MqttClientPublishResult(
169169
null,
170170
MqttClientPublishReasonCode.Success,
171-
string.Empty, //TODO: @maxim set the correct reason string, do we need any?
171+
ChunkingConstants.ChunkedMessageSuccessReasonString,
172172
new List<MqttUserProperty>(message.UserProperties ?? Enumerable.Empty<MqttUserProperty>()));
173173
}
174174

dotnet/test/Azure.Iot.Operations.Protocol.UnitTests/Chunking/ChunkedMessageAssemblerTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void TryReassemble_ReturnsValidMessageWhenComplete()
7676
// Arrange
7777
var assembler = new ChunkedMessageAssembler(2, ChunkingChecksumAlgorithm.SHA256);
7878
var chunk0 = CreateMqttMessageEventArgs("payload1");
79-
var chunk1 = CreateMqttMessageEventArgs("payload2");
79+
var chunk1 = CreateMqttMessageEventArgs(" payload2");
8080

8181
// Act
8282
assembler.AddChunk(0, chunk0);
@@ -89,13 +89,13 @@ public void TryReassemble_ReturnsValidMessageWhenComplete()
8989

9090
// Convert payload to string for easier assertion
9191
var payload = reassembledArgs!.ApplicationMessage.Payload;
92-
var combined = "";
92+
var assembledPayloadAsString = "";
9393
foreach (var segment in payload)
9494
{
95-
combined += Encoding.UTF8.GetString(segment.Span);
95+
assembledPayloadAsString += Encoding.UTF8.GetString(segment.Span);
9696
}
9797

98-
Assert.Equal("payload1payload2", combined);
98+
Assert.Equal("payload1 payload2", assembledPayloadAsString);
9999
}
100100

101101
[Fact]

dotnet/test/Azure.Iot.Operations.Protocol.UnitTests/Chunking/ChunkingMqttClientTests.cs

Lines changed: 35 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ public async Task PublishAsync_SmallMessage_PassesThroughToInnerClient()
2121
var expectedResult = new MqttClientPublishResult(
2222
null,
2323
MqttClientPublishReasonCode.Success,
24-
string.Empty,
24+
"No chunking result",
2525
new List<MqttUserProperty>());
2626

27+
MqttApplicationMessage? capturedMessage = null;
2728
mockInnerClient
2829
.Setup(c => c.PublishAsync(It.IsAny<MqttApplicationMessage>(), It.IsAny<CancellationToken>()))
30+
.Callback<MqttApplicationMessage, CancellationToken>((msg, _) => capturedMessage = msg)
2931
.ReturnsAsync(expectedResult);
3032

3133
// Configure connected client with MaxPacketSize
@@ -44,7 +46,7 @@ public async Task PublishAsync_SmallMessage_PassesThroughToInnerClient()
4446
var options = new ChunkingOptions
4547
{
4648
Enabled = true,
47-
StaticOverhead = 100 // Use small overhead for test
49+
StaticOverhead = 100
4850
};
4951

5052
var client = new ChunkingMqttClient(mockInnerClient.Object, options);
@@ -64,11 +66,9 @@ public async Task PublishAsync_SmallMessage_PassesThroughToInnerClient()
6466
var result = await client.PublishAsync(smallMessage, CancellationToken.None);
6567

6668
// Assert
67-
mockInnerClient.Verify(
68-
c => c.PublishAsync(It.Is<MqttApplicationMessage>(m => m == smallMessage), It.IsAny<CancellationToken>()),
69-
Times.Once);
70-
71-
Assert.Equal(expectedResult, result);
69+
Assert.NotEqual(ChunkingConstants.ChunkedMessageSuccessReasonString, result.ReasonString);
70+
Assert.NotNull(capturedMessage);
71+
Assert.Same(smallMessage, capturedMessage);
7272
}
7373

7474
[Fact]
@@ -78,14 +78,16 @@ public async Task PublishAsync_LargeMessage_ChunksMessageAndSendsMultipleMessage
7878
var mockInnerClient = new Mock<IMqttClient>();
7979
var publishedMessages = new List<MqttApplicationMessage>();
8080

81+
var mqttClientPublishResult = new MqttClientPublishResult(
82+
null,
83+
MqttClientPublishReasonCode.Success,
84+
"No chunking result",
85+
new List<MqttUserProperty>());
86+
8187
mockInnerClient
8288
.Setup(c => c.PublishAsync(It.IsAny<MqttApplicationMessage>(), It.IsAny<CancellationToken>()))
8389
.Callback<MqttApplicationMessage, CancellationToken>((msg, _) => publishedMessages.Add(msg))
84-
.ReturnsAsync(new MqttClientPublishResult(
85-
null,
86-
MqttClientPublishReasonCode.Success,
87-
string.Empty,
88-
new List<MqttUserProperty>()));
90+
.ReturnsAsync(mqttClientPublishResult);
8991

9092
// Configure connected client with MaxPacketSize
9193
mockInnerClient.SetupGet(c => c.IsConnected).Returns(true);
@@ -104,7 +106,7 @@ public async Task PublishAsync_LargeMessage_ChunksMessageAndSendsMultipleMessage
104106
var options = new ChunkingOptions
105107
{
106108
Enabled = true,
107-
StaticOverhead = 100, // Use small overhead for test
109+
StaticOverhead = 100,
108110
ChecksumAlgorithm = ChunkingChecksumAlgorithm.SHA256
109111
};
110112

@@ -130,43 +132,36 @@ public async Task PublishAsync_LargeMessage_ChunksMessageAndSendsMultipleMessage
130132
var result = await client.PublishAsync(largeMessage, CancellationToken.None);
131133

132134
// Assert
135+
Assert.Equal(ChunkingConstants.ChunkedMessageSuccessReasonString, result.ReasonString);
136+
133137
// Should have 3 chunks
134138
Assert.Equal(3, publishedMessages.Count);
135139

136140
// Verify all messages have the chunk metadata property
141+
var messageIds = new HashSet<string>();
137142
foreach (var msg in publishedMessages)
138143
{
139144
var chunkProperty = msg.UserProperties?.FirstOrDefault(p => p.Name == ChunkingConstants.ChunkUserProperty);
140145
Assert.NotNull(chunkProperty);
141146

142147
// Parse the metadata
143-
var metadata = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(chunkProperty!.Value);
148+
var metadata = JsonSerializer.Deserialize<ChunkMetadata>(chunkProperty!.Value);
144149
Assert.NotNull(metadata);
145-
146-
// Should have messageId and chunkIndex fields
147-
Assert.True(metadata!.ContainsKey(ChunkingConstants.MessageIdField));
148-
Assert.True(metadata.ContainsKey(ChunkingConstants.ChunkIndexField));
150+
Assert.NotEmpty(metadata!.MessageId);
151+
messageIds.Add(metadata.MessageId);
152+
Assert.True(metadata.ChunkIndex >= 0);
153+
Assert.True(metadata.Timeout == ChunkingConstants.DefaultChunkTimeout);
149154

150155
// First chunk should have totalChunks and checksum
151-
if (metadata[ChunkingConstants.ChunkIndexField].GetInt32() == 0)
156+
if (metadata.ChunkIndex == 0)
152157
{
153-
Assert.True(metadata.ContainsKey(ChunkingConstants.TotalChunksField));
154-
Assert.True(metadata.ContainsKey(ChunkingConstants.ChecksumField));
155-
Assert.Equal(3, metadata[ChunkingConstants.TotalChunksField].GetInt32());
158+
Assert.NotNull(metadata.TotalChunks);
159+
Assert.NotNull(metadata.Checksum);
160+
Assert.Equal(3, metadata.TotalChunks);
156161
}
157162
}
158163

159-
// Verify all chunks have the same messageId
160-
var messageId = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(
161-
publishedMessages[0].UserProperties!.First(p => p.Name == ChunkingConstants.ChunkUserProperty).Value)?
162-
[ChunkingConstants.MessageIdField].GetString();
163-
164-
foreach (var msg in publishedMessages)
165-
{
166-
var msgMetadata = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(
167-
msg.UserProperties!.First(p => p.Name == ChunkingConstants.ChunkUserProperty).Value);
168-
Assert.Equal(messageId, msgMetadata![ChunkingConstants.MessageIdField].GetString());
169-
}
164+
Assert.Single(messageIds); // All chunks should have the same messageId
170165

171166
// Verify total payload size across all chunks equals original payload size
172167
var totalChunkSize = publishedMessages.Sum(m => m.Payload.Length);
@@ -195,12 +190,7 @@ public async Task HandleApplicationMessageReceivedAsync_NonChunkedMessage_Passes
195190
{
196191
Payload = new ReadOnlySequence<byte>(payload)
197192
};
198-
199-
var receivedArgs = new MqttApplicationMessageReceivedEventArgs(
200-
"client1",
201-
message,
202-
1,
203-
(_, _) => Task.CompletedTask);
193+
var receivedArgs = new MqttApplicationMessageReceivedEventArgs("client1", message, 1, (_, _) => Task.CompletedTask);
204194

205195
// Act
206196
// Simulate receiving a message from the inner client
@@ -231,41 +221,19 @@ public async Task HandleApplicationMessageReceivedAsync_ChunkedMessage_Reassembl
231221
var messageId = Guid.NewGuid().ToString("D");
232222
var fullMessage = "This is a complete message after reassembly";
233223
var fullPayload = Encoding.UTF8.GetBytes(fullMessage);
234-
var checksum = ChecksumCalculator.CalculateChecksum(
235-
new ReadOnlySequence<byte>(fullPayload),
236-
ChunkingChecksumAlgorithm.SHA256);
224+
var checksum = ChecksumCalculator.CalculateChecksum(new ReadOnlySequence<byte>(fullPayload), ChunkingChecksumAlgorithm.SHA256);
237225

238226
// Create a chunked message with 2 parts
239227
var chunk1Text = "This is a complete ";
240228
var chunk2Text = "message after reassembly";
241229

242230
// Create first chunk with metadata
243-
var chunk1 = CreateChunkedMessage(
244-
"test/topic",
245-
chunk1Text,
246-
messageId,
247-
0,
248-
2,
249-
checksum);
231+
var chunk1 = CreateChunkedMessage("test/topic", chunk1Text, messageId, 0, 2, checksum);
250232

251233
// Create second chunk with metadata
252-
var chunk2 = CreateChunkedMessage(
253-
"test/topic",
254-
chunk2Text,
255-
messageId,
256-
1);
257-
258-
var receivedArgs1 = new MqttApplicationMessageReceivedEventArgs(
259-
"client1",
260-
chunk1,
261-
1,
262-
(_, _) => Task.CompletedTask);
263-
264-
var receivedArgs2 = new MqttApplicationMessageReceivedEventArgs(
265-
"client1",
266-
chunk2,
267-
2,
268-
(_, _) => Task.CompletedTask);
234+
var chunk2 = CreateChunkedMessage("test/topic", chunk2Text, messageId, 1);
235+
var receivedArgs1 = new MqttApplicationMessageReceivedEventArgs("client1", chunk1, 1, (_, _) => Task.CompletedTask);
236+
var receivedArgs2 = new MqttApplicationMessageReceivedEventArgs("client1", chunk2, 2, (_, _) => Task.CompletedTask);
269237

270238
// Act
271239
// Simulate receiving chunks from the inner client
@@ -276,12 +244,7 @@ public async Task HandleApplicationMessageReceivedAsync_ChunkedMessage_Reassembl
276244
Assert.True(handlerCalled);
277245
Assert.NotNull(capturedArgs);
278246

279-
// Verify reassembled payload matches the original
280-
var payload = capturedArgs!.ApplicationMessage.Payload;
281-
var combined = "";
282-
foreach (var segment in payload) combined += Encoding.UTF8.GetString(segment.Span);
283-
284-
Assert.Equal(fullMessage, combined);
247+
Assert.Equal(fullPayload, capturedArgs!.ApplicationMessage.Payload.ToArray());
285248

286249
// Verify chunk metadata was removed
287250
Assert.DoesNotContain(

0 commit comments

Comments
 (0)