Skip to content

Commit 4677fa7

Browse files
committed
Fix bug with dynamic subscribers
1 parent 31974be commit 4677fa7

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

src/Foundatio.Mediator/HandlerGenerator.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,6 +1355,7 @@ private static void GenerateCascadingHandlerCallsForeachAwait(
13551355
}
13561356

13571357
source.AppendLine($"var cascadeHandlers_{publishItem.Name} = ((global::Foundatio.Mediator.Mediator)mediator).Registry.GetPublishHandlersForType(typeof({typeFullName}));");
1358+
source.AppendLine($"if (((global::Foundatio.Mediator.Mediator)mediator).Registry.HasSubscribers) ((global::Foundatio.Mediator.Mediator)mediator).Registry.TryWriteSubscription({access});");
13581359
source.AppendLine($"for (int i_{publishItem.Name} = 0; i_{publishItem.Name} < cascadeHandlers_{publishItem.Name}.Length; i_{publishItem.Name}++)");
13591360
source.AppendLine("{");
13601361
source.IncrementIndent();
@@ -1396,6 +1397,7 @@ private static void GenerateCascadingHandlerCallsTaskWhenAll(
13961397
}
13971398

13981399
source.AppendLine($"var cascadeHandlers_{publishItem.Name} = ((global::Foundatio.Mediator.Mediator)mediator).Registry.GetPublishHandlersForType(typeof({typeFullName}));");
1400+
source.AppendLine($"if (((global::Foundatio.Mediator.Mediator)mediator).Registry.HasSubscribers) ((global::Foundatio.Mediator.Mediator)mediator).Registry.TryWriteSubscription({access});");
13991401
source.AppendLine($"for (int i_{publishItem.Name} = 0; i_{publishItem.Name} < cascadeHandlers_{publishItem.Name}.Length; i_{publishItem.Name}++)");
14001402
source.AppendLine("{");
14011403
source.AppendLine($" allCascadeTasks.Add(cascadeHandlers_{publishItem.Name}[i_{publishItem.Name}](mediator, {access}, cancellationToken));");
@@ -1435,6 +1437,7 @@ private static void GenerateCascadingHandlerCallsFireAndForget(
14351437
}
14361438

14371439
source.AppendLine($"var cascadeHandlers_{publishItem.Name} = ((global::Foundatio.Mediator.Mediator)mediator).Registry.GetPublishHandlersForType(typeof({typeFullName}));");
1440+
source.AppendLine($"if (((global::Foundatio.Mediator.Mediator)mediator).Registry.HasSubscribers) ((global::Foundatio.Mediator.Mediator)mediator).Registry.TryWriteSubscription({access});");
14381441
source.AppendLine($"for (int i_{publishItem.Name} = 0; i_{publishItem.Name} < cascadeHandlers_{publishItem.Name}.Length; i_{publishItem.Name}++)");
14391442
source.AppendLine("{");
14401443
source.IncrementIndent();

tests/Foundatio.Mediator.Tests/Integration/E2E_SubscribeAsyncTests.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ public record TestEvent(string Name) : ITestEvent;
1717
public record OtherEvent(string Name) : ITestEvent;
1818
public record UnrelatedEvent(string Name);
1919

20+
public record CascadeCommand(string Name);
21+
22+
public class CascadeHandler
23+
{
24+
public (Result, TestEvent) Handle(CascadeCommand cmd)
25+
=> (Result.Success(), new TestEvent(cmd.Name));
26+
}
27+
2028
/// <summary>Polls until <paramref name="condition"/> returns true or the timeout expires.</summary>
2129
private static async Task WaitUntilAsync(Func<bool> condition, int timeoutMs = 2000)
2230
{
@@ -355,4 +363,39 @@ await Assert.ThrowsAsync<ObjectDisposedException>(async () =>
355363
{ }
356364
});
357365
}
366+
367+
[Fact]
368+
public async Task SubscribeAsync_ReceivesCascadingEvents()
369+
{
370+
var services = new ServiceCollection();
371+
services.AddLogging(c => c.AddTestLogger(o => o.UseOutputHelper(() => _output)));
372+
services.AddMediator(b => b.AddAssembly<TestEvent>());
373+
374+
await using var provider = services.BuildServiceProvider();
375+
var registry = provider.GetRequiredService<HandlerRegistry>();
376+
var mediator = provider.GetRequiredService<IMediator>();
377+
378+
using var cts = new CancellationTokenSource();
379+
var received = new List<TestEvent>();
380+
381+
var subscriberTask = Task.Run(async () =>
382+
{
383+
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cts.Token))
384+
{
385+
received.Add(item);
386+
}
387+
});
388+
389+
await WaitUntilAsync(() => registry.HasSubscribers);
390+
391+
// Invoke a command whose handler returns a cascading TestEvent via tuple
392+
await mediator.InvokeAsync<Result>(new CascadeCommand("from-cascade"));
393+
394+
await WaitUntilAsync(() => received.Count >= 1);
395+
cts.Cancel();
396+
await subscriberTask;
397+
398+
Assert.Single(received);
399+
Assert.Equal("from-cascade", received[0].Name);
400+
}
358401
}

0 commit comments

Comments
 (0)