Skip to content
This repository was archived by the owner on Jan 10, 2025. It is now read-only.

Commit 8b75fdb

Browse files
mauroakzu
authored andcommitted
Added Integration Tests to cover IMqttClient.MessageStream fixes
Also fixed existing tests that started to fail after the fix was applied
1 parent 0f25b71 commit 8b75fdb

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

src/IntegrationTests/ConnectionSpec.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,25 @@ public async Task when_client_disconnects_unexpectedly_then_will_message_is_sent
502502
client3.Dispose();
503503
}
504504

505+
[Fact]
506+
public async Task when_client_disconnects_then_message_stream_completes()
507+
{
508+
var streamCompletedSignal = new ManualResetEventSlim(initialState: false);
509+
var client = await GetClientAsync();
510+
511+
await client.ConnectAsync(new MqttClientCredentials(GetClientId()));
512+
513+
client.MessageStream.Subscribe(_ => { }, onCompleted: () => streamCompletedSignal.Set());
514+
515+
await client.DisconnectAsync();
516+
517+
var streamCompleted = streamCompletedSignal.Wait(2000);
518+
519+
Assert.True(streamCompleted);
520+
521+
client.Dispose();
522+
}
523+
505524
public void Dispose() => server?.Dispose();
506525
}
507526
}

src/IntegrationTests/PublishingSpec.cs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,8 +524,19 @@ await subscriber
524524
});
525525

526526
await subscriber.DisconnectAsync();
527+
527528
var sessionState = await subscriber.ConnectAsync(new MqttClientCredentials(subscriberId), cleanSession: false);
528529

530+
subscriber
531+
.MessageStream
532+
.Where(m => m.Topic == topic)
533+
.Subscribe(m => {
534+
subscriberReceived++;
535+
536+
if (subscriberReceived == count)
537+
subscriberDone.Set();
538+
});
539+
529540
var tasks = new List<Task>();
530541

531542
for (var i = 1; i <= count; i++)
@@ -608,6 +619,80 @@ await subscriber.UnsubscribeAsync(topic)
608619
publisher.Dispose();
609620
}
610621

622+
[Fact]
623+
public async Task when_publish_messages_and_client_disconnects_then_message_stream_is_reset()
624+
{
625+
var topic = Guid.NewGuid().ToString();
626+
627+
var publisher = await GetClientAsync();
628+
var subscriber = await GetClientAsync();
629+
var subscriberId = subscriber.Id;
630+
631+
var goal = default(int);
632+
var goalAchieved = new ManualResetEventSlim();
633+
var received = 0;
634+
635+
await subscriber.SubscribeAsync(topic, MqttQualityOfService.AtMostOnce).ConfigureAwait(continueOnCapturedContext: false);
636+
637+
subscriber
638+
.MessageStream
639+
.Subscribe(m => {
640+
if (m.Topic == topic)
641+
{
642+
received++;
643+
644+
if (received == goal)
645+
goalAchieved.Set();
646+
}
647+
});
648+
649+
goal = 5;
650+
651+
var tasks = new List<Task>();
652+
653+
for (var i = 1; i <= goal; i++)
654+
{
655+
var testMessage = GetTestMessage(i);
656+
var message = new MqttApplicationMessage(topic, Serializer.Serialize(testMessage));
657+
658+
tasks.Add(publisher.PublishAsync(message, MqttQualityOfService.AtMostOnce));
659+
}
660+
661+
await Task.WhenAll(tasks);
662+
663+
var completed = goalAchieved.Wait(TimeSpan.FromSeconds(Configuration.WaitTimeoutSecs));
664+
665+
Assert.True(completed);
666+
Assert.Equal(goal, received);
667+
668+
await subscriber.DisconnectAsync();
669+
670+
goal = 3;
671+
goalAchieved.Reset();
672+
received = 0;
673+
completed = false;
674+
675+
await subscriber.ConnectAsync(new MqttClientCredentials(subscriberId), cleanSession: false);
676+
677+
for (var i = 1; i <= goal; i++)
678+
{
679+
var testMessage = GetTestMessage(i);
680+
var message = new MqttApplicationMessage(topic, Serializer.Serialize(testMessage));
681+
682+
tasks.Add(publisher.PublishAsync(message, MqttQualityOfService.AtMostOnce));
683+
}
684+
685+
completed = goalAchieved.Wait(TimeSpan.FromSeconds(Configuration.WaitTimeoutSecs));
686+
687+
Assert.False(completed);
688+
Assert.Equal(0, received);
689+
690+
await subscriber.UnsubscribeAsync(topic).ConfigureAwait(continueOnCapturedContext: false);
691+
692+
subscriber.Dispose();
693+
publisher.Dispose();
694+
}
695+
611696
public void Dispose ()
612697
{
613698
if (server != null) {

0 commit comments

Comments
 (0)