Skip to content

Commit 1a7e5e8

Browse files
progress
1 parent e0a066c commit 1a7e5e8

File tree

2 files changed

+4
-301
lines changed

2 files changed

+4
-301
lines changed

dotnet/src/Azure.Iot.Operations.Protocol/Chunking/ChunkingMqttPubSubClient.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ private void UpdateMaxPacketSizeFromConnectResult(MqttClientConnectResult? resul
9797
throw new InvalidOperationException("Chunking client requires a defined maximum packet size to function properly.");
9898
}
9999

100-
Interlocked.Exchange(ref _maxPacketSize, (int)result!.MaximumPacketSize!.Value);
100+
// _maxPacketSize = (int)result!.MaximumPacketSize!.Value;
101+
_maxPacketSize = 64 * 1024;
101102
}
102103

103104
private async Task<MqttClientPublishResult> PublishChunkedMessageAsync(MqttApplicationMessage message, CancellationToken cancellationToken)

dotnet/test/Azure.Iot.Operations.Protocol.IntegrationTests/Chunking/ChunkingMqttClientIntegrationTests.cs

Lines changed: 2 additions & 300 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,8 @@
22
// Licensed under the MIT License.
33

44
using Azure.Iot.Operations.Protocol.Chunking;
5-
using Azure.Iot.Operations.Protocol.Events;
65
using Azure.Iot.Operations.Protocol.Models;
7-
using Azure.Iot.Operations.Mqtt;
8-
using System;
96
using System.Buffers;
10-
using System.Collections.Generic;
11-
using System.Linq;
12-
using System.Text;
13-
using System.Threading;
14-
using System.Threading.Tasks;
15-
using Xunit;
167

178
namespace Azure.Iot.Operations.Protocol.IntegrationTests.Chunking
189
{
@@ -117,9 +108,8 @@ public async Task ChunkingMqttClient_LargeMessage_ChunkingAndReassembly()
117108
var topic = $"chunking/test/{Guid.NewGuid()}";
118109
await chunkingClient.SubscribeAsync(new MqttClientSubscribeOptions(topic, MqttQualityOfServiceLevel.AtLeastOnce));
119110

120-
// Create a large message - 100KB payload to force chunking
121-
// Most MQTT brokers have default max packet size <= 64KB
122-
var largePayloadSize = 1024 * 1024; // 1MB to ensure chunking
111+
// TODO: @maximsemenov80 for the test purpose UpdateMaxPacketSizeFromConnectResult artificially set MaxPacketSize to 64KB
112+
var largePayloadSize = 1024 * 1024; // 1MB
123113
var largePayload = new byte[largePayloadSize];
124114

125115
// Fill with recognizable pattern for verification
@@ -171,293 +161,5 @@ public async Task ChunkingMqttClient_LargeMessage_ChunkingAndReassembly()
171161
Assert.NotNull(testProperty);
172162
Assert.Equal("testValue", testProperty!.Value);
173163
}
174-
175-
/*
176-
[Fact]
177-
public async Task ChunkingMqttClient_MessageWithComplexProperties_PreservesAllProperties()
178-
{
179-
// Arrange
180-
// Create a base client
181-
var baseClient = await ClientFactory.CreateClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
182-
183-
// Create a chunking client with settings that force chunking
184-
var options = new ChunkingOptions
185-
{
186-
Enabled = true,
187-
StaticOverhead = 500,
188-
ChunkTimeout = TimeSpan.FromSeconds(30)
189-
};
190-
191-
await using var chunkingClient = new ChunkingMqttClient(baseClient, options);
192-
193-
var messageReceivedTcs = new TaskCompletionSource<MqttApplicationMessage>();
194-
chunkingClient.ApplicationMessageReceivedAsync += (args) =>
195-
{
196-
messageReceivedTcs.TrySetResult(args.ApplicationMessage);
197-
return Task.CompletedTask;
198-
};
199-
200-
// Subscribe to a unique topic
201-
var topic = $"chunking/test/{Guid.NewGuid()}";
202-
await chunkingClient.SubscribeAsync(new MqttClientSubscribeOptions(topic, MqttQualityOfServiceLevel.AtLeastOnce));
203-
204-
// Create a large message with various MQTT properties
205-
var payloadSize = 50 * 1024; // 50KB to ensure chunking
206-
var payload = new byte[payloadSize];
207-
Random.Shared.NextBytes(payload);
208-
209-
var correlationData = Encoding.UTF8.GetBytes("correlation-data-value");
210-
211-
var message = new MqttApplicationMessage(topic, MqttQualityOfServiceLevel.ExactlyOnce)
212-
{
213-
Payload = new ReadOnlySequence<byte>(payload),
214-
ContentType = "application/json",
215-
ResponseTopic = "response/topic/path",
216-
CorrelationData = new ReadOnlySequence<byte>(correlationData),
217-
PayloadFormatIndicator = MqttPayloadFormatIndicator.Utf8,
218-
MessageExpiryInterval = 3600,
219-
Retain = true,
220-
UserProperties = new List<MqttUserProperty>
221-
{
222-
new("prop1", "value1"),
223-
new("prop2", "value2"),
224-
new("prop3", "value3")
225-
}
226-
};
227-
228-
// Act
229-
var publishResult = await chunkingClient.PublishAsync(message);
230-
231-
// Wait for the reassembled message to be received
232-
MqttApplicationMessage? receivedMessage = null;
233-
try
234-
{
235-
receivedMessage = await messageReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(30));
236-
}
237-
catch (TimeoutException)
238-
{
239-
Assert.Fail("Timed out waiting for the reassembled message to be received");
240-
}
241-
242-
// Assert
243-
Assert.NotNull(receivedMessage);
244-
245-
// Verify all properties were preserved
246-
Assert.Equal(message.Topic, receivedMessage.Topic);
247-
Assert.Equal(message.QualityOfServiceLevel, receivedMessage.QualityOfServiceLevel);
248-
Assert.Equal(message.ContentType, receivedMessage.ContentType);
249-
Assert.Equal(message.ResponseTopic, receivedMessage.ResponseTopic);
250-
Assert.Equal(correlationData, receivedMessage.CorrelationData.ToArray());
251-
Assert.Equal(message.PayloadFormatIndicator, receivedMessage.PayloadFormatIndicator);
252-
Assert.Equal(message.MessageExpiryInterval, receivedMessage.MessageExpiryInterval);
253-
Assert.Equal(message.Retain, receivedMessage.Retain);
254-
255-
// Verify user properties were preserved
256-
Assert.Contains(receivedMessage.UserProperties!, p => p.Name == "prop1" && p.Value == "value1");
257-
Assert.Contains(receivedMessage.UserProperties!, p => p.Name == "prop2" && p.Value == "value2");
258-
Assert.Contains(receivedMessage.UserProperties!, p => p.Name == "prop3" && p.Value == "value3");
259-
260-
await chunkingClient.DisconnectAsync();
261-
}
262-
263-
[Fact]
264-
public async Task ChunkingMqttClient_MultipleClients_CanExchangeChunkedMessages()
265-
{
266-
// Arrange
267-
// Create two base clients
268-
var baseClient1 = await ClientFactory.CreateClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
269-
var baseClient2 = await ClientFactory.CreateClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
270-
271-
// Create chunking clients
272-
var options = new ChunkingOptions
273-
{
274-
Enabled = true,
275-
StaticOverhead = 500,
276-
ChunkTimeout = TimeSpan.FromSeconds(30)
277-
};
278-
279-
await using var chunkingClient1 = new ChunkingMqttClient(baseClient1, options);
280-
await using var chunkingClient2 = new ChunkingMqttClient(baseClient2, options);
281-
282-
var messageReceivedTcs = new TaskCompletionSource<MqttApplicationMessage>();
283-
chunkingClient2.ApplicationMessageReceivedAsync += (args) =>
284-
{
285-
messageReceivedTcs.TrySetResult(args.ApplicationMessage);
286-
return Task.CompletedTask;
287-
};
288-
289-
// Subscribe client2 to a unique topic
290-
var topic = $"chunking/test/{Guid.NewGuid()}";
291-
await chunkingClient2.SubscribeAsync(new MqttClientSubscribeOptions(topic, MqttQualityOfServiceLevel.AtLeastOnce));
292-
293-
// Wait briefly to ensure subscription is established
294-
await Task.Delay(1000);
295-
296-
// Create a large message on client1
297-
var payloadSize = 80 * 1024; // 80KB
298-
var payload = new byte[payloadSize];
299-
Random.Shared.NextBytes(payload);
300-
301-
var message = new MqttApplicationMessage(topic, MqttQualityOfServiceLevel.AtLeastOnce)
302-
{
303-
Payload = new ReadOnlySequence<byte>(payload)
304-
};
305-
306-
// Act
307-
var publishResult = await chunkingClient1.PublishAsync(message);
308-
309-
// Wait for client2 to receive the reassembled message
310-
MqttApplicationMessage? receivedMessage = null;
311-
try
312-
{
313-
receivedMessage = await messageReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(30));
314-
}
315-
catch (TimeoutException)
316-
{
317-
Assert.Fail("Timed out waiting for the message to be received");
318-
}
319-
320-
// Assert
321-
Assert.NotNull(receivedMessage);
322-
Assert.Equal(payloadSize, receivedMessage.Payload.Length);
323-
Assert.Equal(payload, receivedMessage.Payload.ToArray());
324-
325-
await chunkingClient1.DisconnectAsync();
326-
await chunkingClient2.DisconnectAsync();
327-
}
328-
329-
[Fact]
330-
public async Task ChunkingMqttClient_Reconnection_ClearsInProgressReassembly()
331-
{
332-
// This test verifies that incomplete reassembly state is properly cleared on disconnect
333-
334-
// Arrange
335-
var baseClient = await ClientFactory.CreateClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
336-
337-
var options = new ChunkingOptions
338-
{
339-
Enabled = true,
340-
StaticOverhead = 500,
341-
ChunkTimeout = TimeSpan.FromMinutes(5) // Long timeout to ensure it doesn't expire naturally
342-
};
343-
344-
await using var chunkingClient = new ChunkingMqttClient(baseClient, options);
345-
346-
// Counter for message reception
347-
int messagesReceived = 0;
348-
var firstMessageTcs = new TaskCompletionSource<MqttApplicationMessage>();
349-
var secondMessageTcs = new TaskCompletionSource<MqttApplicationMessage>();
350-
351-
chunkingClient.ApplicationMessageReceivedAsync += (args) =>
352-
{
353-
messagesReceived++;
354-
if (messagesReceived == 1)
355-
{
356-
firstMessageTcs.TrySetResult(args.ApplicationMessage);
357-
}
358-
else if (messagesReceived == 2)
359-
{
360-
secondMessageTcs.TrySetResult(args.ApplicationMessage);
361-
}
362-
return Task.CompletedTask;
363-
};
364-
365-
// Subscribe to a topic
366-
var topic = $"chunking/test/{Guid.NewGuid()}";
367-
await chunkingClient.SubscribeAsync(new MqttClientSubscribeOptions(topic, MqttQualityOfServiceLevel.AtLeastOnce));
368-
369-
// Create two identical messages
370-
var payload = new byte[70 * 1024]; // Large enough to ensure chunking
371-
Random.Shared.NextBytes(payload);
372-
373-
var message = new MqttApplicationMessage(topic, MqttQualityOfServiceLevel.AtLeastOnce)
374-
{
375-
Payload = new ReadOnlySequence<byte>(payload)
376-
};
377-
378-
// Act - Part 1: Send first message
379-
await chunkingClient.PublishAsync(message);
380-
381-
// Wait for first message to arrive
382-
var firstMessage = await firstMessageTcs.Task.WaitAsync(TimeSpan.FromSeconds(30));
383-
Assert.NotNull(firstMessage);
384-
385-
// Disconnect and reconnect
386-
await chunkingClient.DisconnectAsync();
387-
await Task.Delay(1000); // Brief pause
388-
await chunkingClient.ReconnectAsync();
389-
390-
// Resubscribe
391-
await chunkingClient.SubscribeAsync(new MqttClientSubscribeOptions(topic, MqttQualityOfServiceLevel.AtLeastOnce));
392-
await Task.Delay(1000); // Brief pause to ensure subscription is established
393-
394-
// Act - Part 2: Send second message
395-
await chunkingClient.PublishAsync(message);
396-
397-
// Wait for second message
398-
var secondMessage = await secondMessageTcs.Task.WaitAsync(TimeSpan.FromSeconds(30));
399-
400-
// Assert
401-
Assert.NotNull(secondMessage);
402-
Assert.Equal(2, messagesReceived); // Both messages should be received and reassembled
403-
Assert.Equal(payload, secondMessage.Payload.ToArray());
404-
405-
await chunkingClient.DisconnectAsync();
406-
}
407-
408-
[Fact(Skip = "This test requires special broker configuration and manual verification")]
409-
public async Task ChunkingMqttClient_MessageExceedingBrokerMaxSize_HandlesProperly()
410-
{
411-
// This test requires a broker with a known maximum message size
412-
// The test would need to be adjusted based on the broker configuration
413-
414-
// Arrange
415-
var baseClient = await ClientFactory.CreateClientAsyncFromEnvAsync(Guid.NewGuid().ToString());
416-
417-
var options = new ChunkingOptions
418-
{
419-
Enabled = true,
420-
StaticOverhead = 500,
421-
ChunkTimeout = TimeSpan.FromSeconds(30)
422-
};
423-
424-
await using var chunkingClient = new ChunkingMqttClient(baseClient, options);
425-
426-
var messageReceivedTcs = new TaskCompletionSource<MqttApplicationMessage>();
427-
chunkingClient.ApplicationMessageReceivedAsync += (args) =>
428-
{
429-
messageReceivedTcs.TrySetResult(args.ApplicationMessage);
430-
return Task.CompletedTask;
431-
};
432-
433-
// Subscribe to a topic
434-
var topic = $"chunking/test/{Guid.NewGuid()}";
435-
await chunkingClient.SubscribeAsync(new MqttClientSubscribeOptions(topic, MqttQualityOfServiceLevel.AtLeastOnce));
436-
437-
// Create a very large message (adjust size based on broker limits)
438-
// Example for a broker with 256KB max packet size:
439-
var payloadSize = 500 * 1024; // 500KB to ensure exceeding broker limits
440-
var payload = new byte[payloadSize];
441-
Random.Shared.NextBytes(payload);
442-
443-
var message = new MqttApplicationMessage(topic, MqttQualityOfServiceLevel.AtLeastOnce)
444-
{
445-
Payload = new ReadOnlySequence<byte>(payload)
446-
};
447-
448-
// Act
449-
var publishResult = await chunkingClient.PublishAsync(message);
450-
451-
// Wait for reassembled message
452-
var receivedMessage = await messageReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(60));
453-
454-
// Assert
455-
Assert.NotNull(receivedMessage);
456-
Assert.Equal(payloadSize, receivedMessage.Payload.Length);
457-
Assert.Equal(payload, receivedMessage.Payload.ToArray());
458-
459-
await chunkingClient.DisconnectAsync();
460-
}
461-
*/
462164
}
463165
}

0 commit comments

Comments
 (0)