Skip to content

Commit 039054a

Browse files
committed
allow CancellationToken
1 parent 4841c43 commit 039054a

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
using System.Threading;
2+
13
namespace BotSharp.Abstraction.Infrastructures.Events;
24

35
public interface IEventSubscriber
46
{
57
Task SubscribeAsync(string channel, Func<string, string, Task> received);
68

7-
Task SubscribeAsync(string channel, string group, bool priorityEnabled, Func<string, string, Task> received);
9+
Task SubscribeAsync(string channel, string group, bool priorityEnabled,
10+
Func<string, string, Task> received,
11+
CancellationToken? stoppingToken = null);
812
}

src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisSubscriber.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ await _subscriber.SubscribeAsync(channel, async (ch, message) =>
2525
});
2626
}
2727

28-
public async Task SubscribeAsync(string channel, string group, bool priorityEnabled, Func<string, string, Task> received)
28+
public async Task SubscribeAsync(string channel, string group, bool priorityEnabled,
29+
Func<string, string, Task> received,
30+
CancellationToken? stoppingToken = null)
2931
{
3032
var db = _redis.GetDatabase();
3133

@@ -44,6 +46,12 @@ public async Task SubscribeAsync(string channel, string group, bool priorityEnab
4446
{
4547
await Task.Delay(100);
4648

49+
if (stoppingToken.HasValue && stoppingToken.Value.IsCancellationRequested)
50+
{
51+
_logger.LogInformation($"Stopping consumer channel & group: [{channel}, {group}]");
52+
break;
53+
}
54+
4755
if (priorityEnabled)
4856
{
4957
if (await HandleGroupMessage(db, $"{channel}-{EventPriority.High}", group, received) > 0)

0 commit comments

Comments
 (0)