Skip to content

Commit ce20b31

Browse files
committed
Ensure message bus tests are being cleaned up properly
1 parent a31861c commit ce20b31

File tree

2 files changed

+36
-30
lines changed

2 files changed

+36
-30
lines changed

src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.Linq;
@@ -37,7 +37,7 @@ protected virtual Task CleanupMessageBusAsync(IMessageBus messageBus)
3737

3838
public virtual async Task CanUseMessageOptionsAsync()
3939
{
40-
var messageBus = GetMessageBus();
40+
using var messageBus = GetMessageBus();
4141
if (messageBus == null)
4242
return;
4343

@@ -98,7 +98,7 @@ await messageBus.PublishAsync(new SimpleMessageA
9898

9999
public virtual async Task CanSendMessageAsync()
100100
{
101-
var messageBus = GetMessageBus();
101+
using var messageBus = GetMessageBus();
102102
if (messageBus == null)
103103
return;
104104

@@ -133,7 +133,7 @@ await messageBus.PublishAsync(new SimpleMessageA
133133

134134
public virtual async Task CanHandleNullMessageAsync()
135135
{
136-
var messageBus = GetMessageBus();
136+
using var messageBus = GetMessageBus();
137137
if (messageBus == null)
138138
return;
139139

@@ -161,7 +161,7 @@ await messageBus.SubscribeAsync<object>(msg =>
161161

162162
public virtual async Task CanSendDerivedMessageAsync()
163163
{
164-
var messageBus = GetMessageBus();
164+
using var messageBus = GetMessageBus();
165165
if (messageBus == null)
166166
return;
167167

@@ -193,7 +193,7 @@ await messageBus.PublishAsync(new DerivedSimpleMessageA
193193

194194
public virtual async Task CanSendMappedMessageAsync()
195195
{
196-
var messageBus = GetMessageBus(b =>
196+
using var messageBus = GetMessageBus(b =>
197197
{
198198
b.MessageTypeMappings.Add(nameof(SimpleMessageA), typeof(SimpleMessageA));
199199
return b;
@@ -230,7 +230,7 @@ await messageBus.PublishAsync(new SimpleMessageA
230230
public virtual async Task CanSendDelayedMessageAsync()
231231
{
232232
const int numConcurrentMessages = 1000;
233-
var messageBus = GetMessageBus();
233+
using var messageBus = GetMessageBus();
234234
if (messageBus == null)
235235
return;
236236

@@ -276,7 +276,7 @@ await messageBus.PublishAsync(new SimpleMessageA
276276
public virtual async Task CanSubscribeConcurrentlyAsync()
277277
{
278278
const int iterations = 100;
279-
var messageBus = GetMessageBus();
279+
using var messageBus = GetMessageBus();
280280
if (messageBus == null)
281281
return;
282282

@@ -367,7 +367,7 @@ await bus.SubscribeAsync<SimpleMessageA>(msg =>
367367

368368
public virtual async Task CanSendMessageToMultipleSubscribersAsync()
369369
{
370-
var messageBus = GetMessageBus();
370+
using var messageBus = GetMessageBus();
371371
if (messageBus == null)
372372
return;
373373

@@ -405,7 +405,7 @@ await messageBus.PublishAsync(new SimpleMessageA
405405

406406
public virtual async Task CanTolerateSubscriberFailureAsync()
407407
{
408-
var messageBus = GetMessageBus();
408+
using var messageBus = GetMessageBus();
409409
if (messageBus == null)
410410
return;
411411

@@ -422,10 +422,7 @@ await messageBus.SubscribeAsync<SimpleMessageA>(msg =>
422422
Assert.Equal("Hello", msg.Data);
423423
countdown.Signal();
424424
});
425-
await messageBus.SubscribeAsync<SimpleMessageA>(msg =>
426-
{
427-
throw new Exception();
428-
});
425+
await messageBus.SubscribeAsync<SimpleMessageA>(msg => throw new Exception());
429426
await messageBus.SubscribeAsync<SimpleMessageA>(msg =>
430427
{
431428
Assert.Equal("Hello", msg.Data);
@@ -452,7 +449,7 @@ await messageBus.PublishAsync(new SimpleMessageA
452449

453450
public virtual async Task WillOnlyReceiveSubscribedMessageTypeAsync()
454451
{
455-
var messageBus = GetMessageBus();
452+
using var messageBus = GetMessageBus();
456453
if (messageBus == null)
457454
return;
458455

@@ -484,7 +481,7 @@ await messageBus.PublishAsync(new SimpleMessageA
484481

485482
public virtual async Task WillReceiveDerivedMessageTypesAsync()
486483
{
487-
var messageBus = GetMessageBus();
484+
using var messageBus = GetMessageBus();
488485
if (messageBus == null)
489486
return;
490487

@@ -520,7 +517,7 @@ await messageBus.PublishAsync(new SimpleMessageC
520517

521518
public virtual async Task CanSubscribeToRawMessagesAsync()
522519
{
523-
var messageBus = GetMessageBus();
520+
using var messageBus = GetMessageBus();
524521
if (messageBus == null)
525522
return;
526523

@@ -558,7 +555,7 @@ await messageBus.PublishAsync(new SimpleMessageC
558555

559556
public virtual async Task CanSubscribeToAllMessageTypesAsync()
560557
{
561-
var messageBus = GetMessageBus();
558+
using var messageBus = GetMessageBus();
562559
if (messageBus == null)
563560
return;
564561

@@ -593,7 +590,7 @@ await messageBus.PublishAsync(new SimpleMessageC
593590

594591
public virtual async Task WontKeepMessagesWithNoSubscribersAsync()
595592
{
596-
var messageBus = GetMessageBus();
593+
using var messageBus = GetMessageBus();
597594
if (messageBus == null)
598595
return;
599596

@@ -623,7 +620,7 @@ await messageBus.SubscribeAsync<SimpleMessageA>(msg =>
623620

624621
public virtual async Task CanCancelSubscriptionAsync()
625622
{
626-
var messageBus = GetMessageBus();
623+
using var messageBus = GetMessageBus();
627624
if (messageBus == null)
628625
return;
629626

@@ -641,8 +638,8 @@ await messageBus.SubscribeAsync<SimpleMessageA>(async msg =>
641638
countdown.Signal();
642639
}, cancellationTokenSource.Token);
643640

644-
await messageBus.SubscribeAsync<object>(msg => countdown.Signal());
645-
641+
// NOTE: This subscriber will not be canceled.
642+
await messageBus.SubscribeAsync<object>(_ => countdown.Signal());
646643
await messageBus.PublishAsync(new SimpleMessageA
647644
{
648645
Data = "Hello"
@@ -652,7 +649,7 @@ await messageBus.PublishAsync(new SimpleMessageA
652649
Assert.Equal(0, countdown.CurrentCount);
653650
Assert.Equal(1, messageCount);
654651

655-
countdown = new AsyncCountdownEvent(1);
652+
countdown.AddCount(1);
656653
await messageBus.PublishAsync(new SimpleMessageA
657654
{
658655
Data = "Hello"
@@ -670,7 +667,7 @@ await messageBus.PublishAsync(new SimpleMessageA
670667

671668
public virtual async Task CanReceiveFromMultipleSubscribersAsync()
672669
{
673-
var messageBus1 = GetMessageBus();
670+
using var messageBus1 = GetMessageBus();
674671
if (messageBus1 == null)
675672
return;
676673

@@ -683,7 +680,7 @@ await messageBus1.SubscribeAsync<SimpleMessageA>(msg =>
683680
countdown1.Signal();
684681
});
685682

686-
var messageBus2 = GetMessageBus();
683+
using var messageBus2 = GetMessageBus();
687684
try
688685
{
689686
var countdown2 = new AsyncCountdownEvent(1);
@@ -698,9 +695,9 @@ await messageBus1.PublishAsync(new SimpleMessageA
698695
Data = "Hello"
699696
});
700697

701-
await countdown1.WaitAsync(TimeSpan.FromSeconds(20));
698+
await countdown1.WaitAsync(TimeSpan.FromSeconds(5));
702699
Assert.Equal(0, countdown1.CurrentCount);
703-
await countdown2.WaitAsync(TimeSpan.FromSeconds(20));
700+
await countdown2.WaitAsync(TimeSpan.FromSeconds(5));
704701
Assert.Equal(0, countdown2.CurrentCount);
705702
}
706703
finally
@@ -716,10 +713,13 @@ await messageBus1.PublishAsync(new SimpleMessageA
716713

717714
public virtual void CanDisposeWithNoSubscribersOrPublishers()
718715
{
719-
var messageBus = GetMessageBus();
716+
using var messageBus = GetMessageBus();
720717
if (messageBus == null)
721718
return;
722719

723-
using (messageBus) { }
720+
using (messageBus)
721+
{
722+
// Empty using statement to ensure Dispose is called
723+
}
724724
}
725725
}

src/Foundatio/Messaging/InMemoryMessageBus.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Concurrent;
33
using System.Threading;
44
using System.Threading.Tasks;
@@ -75,4 +75,10 @@ protected override async Task PublishImplAsync(string messageType, object messag
7575
_logger.LogWarning(ex, "Error sending message to subscribers: {Message}", ex.Message);
7676
}
7777
}
78+
79+
public override void Dispose()
80+
{
81+
_messageCounts.Clear();
82+
base.Dispose();
83+
}
7884
}

0 commit comments

Comments
 (0)