Skip to content

Commit d39f1a0

Browse files
committed
Add SubscriberOptions
1 parent 3a34d45 commit d39f1a0

File tree

6 files changed

+41
-24
lines changed

6 files changed

+41
-24
lines changed

samples/CleanArchitectureSample/src/Api/Handlers/ClientEventStreamHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public async IAsyncEnumerable<ClientEvent> Handle(
2828
GetEventStream message,
2929
[EnumeratorCancellation] CancellationToken cancellationToken)
3030
{
31-
await foreach (var evt in mediator.SubscribeAsync<IDispatchToClient>(cancellationToken: cancellationToken))
31+
await foreach (var evt in mediator.SubscribeAsync<IDispatchToClient>(cancellationToken))
3232
{
3333
yield return new ClientEvent(evt.GetType().Name, evt);
3434
}

src/Foundatio.Mediator.Abstractions/HandlerRegistry.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -430,22 +430,20 @@ public bool HasSubscribers
430430
/// The notification type to subscribe to. Can be a concrete type, base class, or interface.
431431
/// Messages are matched using <see cref="Type.IsAssignableFrom"/>.
432432
/// </typeparam>
433-
/// <param name="maxCapacity">
434-
/// Maximum number of items buffered per subscriber. When full, the oldest item is dropped.
435-
/// Default is 100.
436-
/// </param>
437433
/// <param name="cancellationToken">Token that ends the subscription when cancelled.</param>
434+
/// <param name="options">Optional settings controlling buffer capacity and other subscription behavior.</param>
438435
/// <returns>An async stream of matching notifications.</returns>
439436
public async IAsyncEnumerable<T> SubscribeAsync<T>(
440-
int maxCapacity = 100,
441-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
437+
[EnumeratorCancellation] CancellationToken cancellationToken = default,
438+
SubscriberOptions? options = null)
442439
{
443440
if (_disposed)
444441
throw new ObjectDisposedException(nameof(HandlerRegistry));
445442

443+
var maxCapacity = options?.MaxCapacity ?? 100;
446444
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(maxCapacity)
447445
{
448-
FullMode = BoundedChannelFullMode.DropOldest,
446+
FullMode = options?.FullMode ?? BoundedChannelFullMode.DropOldest,
449447
SingleWriter = false,
450448
SingleReader = true
451449
});

src/Foundatio.Mediator.Abstractions/IMediator.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,8 @@ public interface IMediator
128128
/// The notification type to subscribe to. Can be a concrete type, base class, or interface.
129129
/// Messages are matched at runtime using <see cref="Type.IsAssignableFrom"/>.
130130
/// </typeparam>
131-
/// <param name="maxCapacity">
132-
/// Maximum number of items buffered per subscriber. When full, the oldest item is dropped.
133-
/// Default is 100.
134-
/// </param>
135131
/// <param name="cancellationToken">Token that ends the subscription when cancelled.</param>
132+
/// <param name="options">Optional settings controlling buffer capacity and other subscription behavior.</param>
136133
/// <returns>An async stream of matching notifications.</returns>
137134
/// <remarks>
138135
/// Typical usage is in streaming SSE endpoints that push domain events to connected clients:
@@ -141,9 +138,9 @@ public interface IMediator
141138
/// GetEventStream message,
142139
/// CancellationToken ct)
143140
/// {
144-
/// return mediator.SubscribeAsync&lt;INotification&gt;(cancellationToken: ct);
141+
/// return mediator.SubscribeAsync&lt;INotification&gt;(ct);
145142
/// }
146143
/// </code>
147144
/// </remarks>
148-
IAsyncEnumerable<T> SubscribeAsync<T>(int maxCapacity = 100, CancellationToken cancellationToken = default);
145+
IAsyncEnumerable<T> SubscribeAsync<T>(CancellationToken cancellationToken = default, SubscriberOptions? options = null);
149146
}

src/Foundatio.Mediator.Abstractions/Mediator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public ValueTask PublishAsync(object message, CancellationToken cancellationToke
8383
}
8484

8585
/// <inheritdoc />
86-
public IAsyncEnumerable<T> SubscribeAsync<T>(int maxCapacity = 100, CancellationToken cancellationToken = default)
86+
public IAsyncEnumerable<T> SubscribeAsync<T>(CancellationToken cancellationToken = default, SubscriberOptions? options = null)
8787
{
88-
return _registry.SubscribeAsync<T>(maxCapacity, cancellationToken);
88+
return _registry.SubscribeAsync<T>(cancellationToken, options);
8989
}
9090
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Threading.Channels;
2+
3+
namespace Foundatio.Mediator;
4+
5+
/// <summary>
6+
/// Options for configuring a dynamic subscription created via
7+
/// <see cref="IMediator.SubscribeAsync{T}"/>.
8+
/// </summary>
9+
public class SubscriberOptions
10+
{
11+
/// <summary>
12+
/// Maximum number of items buffered per subscriber. When full, the behavior is
13+
/// determined by <see cref="FullMode"/>. Default is 100.
14+
/// </summary>
15+
public int MaxCapacity { get; set; } = 100;
16+
17+
/// <summary>
18+
/// The behavior when the buffer is full and a new item arrives.
19+
/// Default is <see cref="BoundedChannelFullMode.DropOldest"/>.
20+
/// </summary>
21+
public BoundedChannelFullMode FullMode { get; set; } = BoundedChannelFullMode.DropOldest;
22+
}

tests/Foundatio.Mediator.Tests/Integration/E2E_SubscribeAsyncTests.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public async Task SubscribeAsync_ReceivesConcrete_WhenPublished()
4141

4242
var subscriberTask = Task.Run(async () =>
4343
{
44-
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cancellationToken: cts.Token))
44+
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cts.Token))
4545
{
4646
received.Add(item);
4747
}
@@ -77,7 +77,7 @@ public async Task SubscribeAsync_ReceivesInterface_WhenDerivedTypePublished()
7777

7878
var subscriberTask = Task.Run(async () =>
7979
{
80-
await foreach (var item in mediator.SubscribeAsync<ITestEvent>(cancellationToken: cts.Token))
80+
await foreach (var item in mediator.SubscribeAsync<ITestEvent>(cts.Token))
8181
{
8282
received.Add(item);
8383
}
@@ -113,7 +113,7 @@ public async Task SubscribeAsync_DoesNotReceive_UnrelatedTypes()
113113

114114
var subscriberTask = Task.Run(async () =>
115115
{
116-
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cancellationToken: cts.Token))
116+
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cts.Token))
117117
{
118118
received.Add(item);
119119
}
@@ -149,13 +149,13 @@ public async Task SubscribeAsync_MultipleSubscribers_AllReceive()
149149

150150
var sub1 = Task.Run(async () =>
151151
{
152-
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cancellationToken: cts.Token))
152+
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cts.Token))
153153
received1.Add(item);
154154
});
155155

156156
var sub2 = Task.Run(async () =>
157157
{
158-
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cancellationToken: cts.Token))
158+
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cts.Token))
159159
received2.Add(item);
160160
});
161161

@@ -192,7 +192,7 @@ public async Task SubscribeAsync_CancellationEndsStream()
192192

193193
var subscriberTask = Task.Run(async () =>
194194
{
195-
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cancellationToken: cts.Token))
195+
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cts.Token))
196196
{
197197
received.Add(item);
198198
if (received.Count == 2)
@@ -233,7 +233,7 @@ public async Task SubscribeAsync_DropsOldest_WhenBufferFull()
233233
// Use a tiny buffer (capacity 2) so we can verify drop behavior.
234234
var subscriberTask = Task.Run(async () =>
235235
{
236-
await foreach (var item in mediator.SubscribeAsync<TestEvent>(maxCapacity: 2, cancellationToken: cts.Token))
236+
await foreach (var item in mediator.SubscribeAsync<TestEvent>(cts.Token, new SubscriberOptions { MaxCapacity = 2 }))
237237
{
238238
received.Add(item);
239239
}
@@ -279,7 +279,7 @@ public async Task HasSubscribers_ReflectsActiveSubscriptions()
279279

280280
var subscriberTask = Task.Run(async () =>
281281
{
282-
await foreach (var _ in mediator.SubscribeAsync<TestEvent>(cancellationToken: cts.Token))
282+
await foreach (var _ in mediator.SubscribeAsync<TestEvent>(cts.Token))
283283
{ }
284284
});
285285

0 commit comments

Comments
 (0)